123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328 |
- package channel
- import (
- "encoding/base64"
- "encoding/json"
- "errors"
- "github.com/zeromicro/go-zero/core/collection"
- "github.com/zeromicro/go-zero/core/logx"
- "google.golang.org/protobuf/encoding/protojson"
- "regexp"
- "strconv"
- "strings"
- "time"
- "wechat-api/database/dao/wechat/model"
- "wechat-api/internal/pkg/customer_of_im/proto"
- "wechat-api/internal/pkg/wechat_ws"
- "wechat-api/internal/svc"
- "wechat-api/workphone"
- )
- type WechatChannel struct {
- *Channel
- wsClient *wechat_ws.WechatWsClient
- svcCtx *svc.ServiceContext
- }
- func NewWechatChannel(ws *wechat_ws.WechatWsClient, svcCtx *svc.ServiceContext) IChannel {
- wechatChannel := &WechatChannel{
- wsClient: ws,
- svcCtx: svcCtx,
- Channel: new(Channel),
- }
- wechatChannel.InitChannel(wechatChannel)
- return wechatChannel
- }
- func (w *WechatChannel) GetWxsCache() *collection.SafeMap {
- cacheKey := "WechatChannel_WxList"
- wxs := collection.NewSafeMap()
- v, exist := w.svcCtx.Cache.Get(cacheKey)
- if exist {
- value, ok := v.(*collection.SafeMap)
- if ok {
- wxs = value
- return wxs
- }
- }
- wxDao := w.svcCtx.WechatQ.Wx
- wxlist, err := wxDao.Where(wxDao.Ctype.Eq(1)).Find()
- if err != nil {
- logx.Error("获取微信列表失败:", err)
- } else {
- for _, wx := range wxlist {
- if wx.AgentID != 0 {
- wx.APIKey = w.svcCtx.Config.OpenAI.ApiKey
- wx.APIBase = w.svcCtx.Config.OpenAI.BaseUrl
- }
- wxs.Set(wx.Wxid, wx)
- }
- w.svcCtx.Cache.SetWithExpire(cacheKey, wxs, time.Minute*10)
- }
- return wxs
- }
- func (w *WechatChannel) GetWxInfo(wxid string) (*model.Wx, error) {
- wxs := w.GetWxsCache()
- v, exists := wxs.Get(wxid)
- if !exists {
- return nil, errors.New("未查找对应微信,不予处理")
- }
- wx, ok := v.(*model.Wx)
- if !ok {
- return nil, errors.New("微信信息转换失败")
- }
- return wx, nil
- }
- func (w *WechatChannel) GetContactCache(wxid string) (*model.Contact, error) {
- cacheKey := "WechatChannel_WxContact_" + wxid
- v, exist := w.svcCtx.Cache.Get(cacheKey)
- if exist {
- value, ok := v.(*model.Contact)
- if ok {
- return value, nil
- }
- }
- contactDao := w.svcCtx.WechatQ.Contact
- contact, err := contactDao.Where(contactDao.Wxid.Eq(wxid)).First()
- if err != nil {
- logx.Error("获取微信联系人失败:", err)
- return nil, err
- } else {
- w.svcCtx.Cache.SetWithExpire(cacheKey, contact, time.Hour*24)
- return contact, nil
- }
- }
- func (w *WechatChannel) OnMessage(msg *wechat_ws.MsgJsonObject) error {
- switch msg.MsgType {
- case "FriendTalkNotice":
- // 别人发过来的消息
- _ = w.FriendTalkHandle(msg.Message)
- case "WeChatTalkToFriendNotice":
- // 发给别人的消息
- _ = w.WeChatTalkToFriendHandle(msg.Message)
- default:
- //logx.Info("未知消息类型:", msg.MsgType, msg)
- }
- return nil
- }
- func (w *WechatChannel) Send(message *ReplyMessage) error {
- switch message.Type {
- case proto.EnumContentType_Text:
- err := w.SendText(message)
- if err != nil {
- logx.Error("发送文本消息失败:", err)
- return err
- }
- default:
- return errors.New("不支持的消息类型")
- }
- return nil
- }
- func (w *WechatChannel) SendText(message *ReplyMessage) error {
- friendId := message.ToUserId
- Remark := ""
- content := message.Content
- if message.IsGroup {
- friendId = message.GroupId
- Remark = message.ToUserId
- if message.IsAt && message.ToUserNickname != "" {
- content = "@" + message.ToUserNickname + " " + content
- }
- }
- encodedString := base64.StdEncoding.EncodeToString([]byte(content))
- msg := map[string]interface{}{
- "MsgType": "TalkToFriendTask",
- "Content": map[string]interface{}{
- "WeChatId": message.UserId,
- "FriendId": friendId,
- "ContentType": workphone.EnumContentType_Text,
- "Content": encodedString,
- "Remark": Remark,
- },
- }
- transportMessageJSON, err := json.Marshal(msg)
- if err != nil {
- logx.Error("failed to marshal message:", err)
- return err
- }
- w.wsClient.SendMsgByChan(transportMessageJSON)
- return nil
- }
- func (w *WechatChannel) GenerateRelevantContext(msg *ChatMessage) (*RelevantContext, error) {
- wx, err := w.GetWxInfo(msg.UserId)
- if err != nil {
- logx.Error("获取微信信息失败:", err)
- return nil, err
- }
- msg.ToUserNickname = wx.Nickname
- from, err := w.GetContactCache(msg.FromUserId)
- if err == nil {
- msg.FromUserNickname = from.Nickname
- }
- if msg.GroupId != "" {
- group, err := w.GetContactCache(msg.GroupId)
- if err == nil {
- msg.GroupName = group.Nickname
- }
- }
- rctx := &RelevantContext{
- APIKey: wx.APIKey,
- BaseURL: wx.APIBase,
- }
- if strings.HasPrefix(rctx.APIKey, "fastgpt") {
- rctx.IsFastGPT = true
- } else {
- rctx.IsFastGPT = false
- }
- return rctx, nil
- }
- func (w *WechatChannel) TextToChatMessage(msgStr string) (*ChatMessage, error) {
- message := &workphone.FriendTalkNoticeMessage{}
- err := protojson.Unmarshal([]byte(msgStr), message)
- if err != nil {
- return nil, err
- }
- logx.Info("文本消息结构体:", message)
- fromUserId := message.GetFriendId()
- toUserId := message.GetWeChatId()
- groupId := ""
- content := string(message.GetContent())
- isAt := false
- isGroup := false
- if strings.HasSuffix(message.GetFriendId(), "@chatroom") {
- groupId = message.GetFriendId()
- parts := strings.SplitN(content, ":\n", 2)
- if len(parts) > 0 {
- fromUserId = strings.TrimSpace(parts[0])
- if len(parts) > 1 {
- content = strings.TrimSpace(parts[1])
- // 使用正则表达式清理内容
- re := regexp.MustCompile(`@\S+ `)
- content = re.ReplaceAllString(content, "")
- }
- }
- isAt = strings.Contains(message.GetExt(), toUserId)
- if !isAt {
- //logx.Info("群聊消息不是发给当前用户的,无需处理")
- return nil, nil
- }
- isGroup = true
- } else {
- fromUserId = message.GetFriendId()
- toUserId = message.GetWeChatId()
- }
- msg := &ChatMessage{
- UserId: toUserId,
- MsgId: strconv.FormatInt(message.GetMsgSvrId(), 10),
- CreateTime: time.Unix(message.GetCreateTime(), 0),
- Type: proto.EnumContentType_Text,
- Content: content,
- FromUserId: fromUserId,
- FromUserNickname: "",
- ToUserId: toUserId,
- ToUserNickname: "",
- GroupId: groupId,
- GroupName: "",
- IsGroup: isGroup,
- IsAt: isAt,
- }
- return msg, nil
- }
- func (w *WechatChannel) FriendTalkHandle(msgStr string) error {
- message := &workphone.FriendTalkNoticeMessage{}
- err := protojson.Unmarshal([]byte(msgStr), message)
- if err != nil {
- logx.Error("FriendTalkNotice protojson.Unmarshal err:", err)
- return err
- }
- logx.Info("收到消息:", message)
- chatMsg := &ChatMessage{}
- switch message.ContentType {
- case workphone.EnumContentType_Text:
- logx.Info("这是本文消息")
- chatMsg, err = w.TextToChatMessage(msgStr)
- if err != nil {
- logx.Error("FriendTalkNotice TextToChatMessage err:", err)
- return err
- }
- default:
- //logx.Info("不支持的消息类型")
- return nil
- }
- if chatMsg == nil {
- //logx.Info("chatMsg is nil,无需处理")
- return nil
- }
- wxInfo, err := w.GetWxInfo(chatMsg.UserId)
- if err != nil {
- logx.Error("FriendTalkNotice GetWxInfo err:", err)
- return err
- }
- //白名单检查
- if !w.CheckAllowOrBlockList(wxInfo, chatMsg) {
- return errors.New("不在白名单中")
- }
- chatCtx, err := w.GenerateRelevantContext(chatMsg)
- if err != nil {
- logx.Info("FriendTalkNotice GenerateRelevantContext err:", err)
- return err
- }
- logx.Info("处理后的 chatMsg:", chatMsg)
- w.Produce(w.svcCtx, chatCtx, chatMsg)
- return nil
- }
- func (w *WechatChannel) WeChatTalkToFriendHandle(msgStr string) error {
- //logx.Info("收到发送的消息:", msgStr)
- return nil
- }
|