user_label_push_notice.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package wecom
  2. import (
  3. "context"
  4. "encoding/json"
  5. "entgo.io/ent/dialect/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "strconv"
  9. "time"
  10. "wechat-api/ent"
  11. "wechat-api/ent/label"
  12. "wechat-api/ent/labellog"
  13. "wechat-api/ent/wx"
  14. "wechat-api/internal/lock"
  15. "wechat-api/internal/pkg/wechat_ws"
  16. "wechat-api/internal/svc"
  17. "wechat-api/workphone/wecom"
  18. )
  19. type UserLabelPushNotice struct {
  20. svcCtx *svc.ServiceContext
  21. }
  22. func NewUserLabelPushNotice(svcCtx *svc.ServiceContext) *UserLabelPushNotice {
  23. return &UserLabelPushNotice{
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. // Handle 实现 MessageHandlerStrategy 接口
  28. func (f *UserLabelPushNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
  29. var message wecom.UserLabelPushNoticeMessage
  30. logx.Infof("msg.Message 的内容是:%s", msg.Message)
  31. if err := json.Unmarshal([]byte(msg.Message), &message); err != nil {
  32. return err
  33. }
  34. //WxWxId := strconv.FormatInt(message.WxId, 10) //属主微信id
  35. wxInfo, err := svcCtx.DB.Wx.Query().
  36. Where(wx.WxidEQ(message.WxId)).
  37. Only(ctx)
  38. if err != nil {
  39. return err
  40. }
  41. // 整理标签ID和名称列表
  42. labelIDs := make([]uint64, 0, len(message.LabelGroups))
  43. labelNameSet := make(map[string]struct{}) // 用于去重
  44. for _, labelInfoMessage := range message.LabelGroups {
  45. if len(labelInfoMessage.Labels) == 0 {
  46. continue
  47. }
  48. for _, LabelInfo := range labelInfoMessage.Labels {
  49. labelID, err := strconv.ParseUint(LabelInfo.Id, 10, 64)
  50. if err != nil {
  51. fmt.Println("labelId 类型转成失败:", err)
  52. continue
  53. }
  54. labelIDs = append(labelIDs, labelID)
  55. labelNameSet[labelInfoMessage.Name+"-"+LabelInfo.Name] = struct{}{}
  56. }
  57. }
  58. // 提前查询现有 LabelLog 和 Label
  59. existingLabelLogs, err := svcCtx.DB.LabelLog.Query().
  60. Where(
  61. labellog.LabelIDIn(labelIDs...),
  62. labellog.OrganizationID(wxInfo.OrganizationID),
  63. labellog.WxID(message.WxId),
  64. ).
  65. Select(labellog.FieldLabelID).
  66. All(ctx)
  67. if err != nil {
  68. return fmt.Errorf("查询 LabelLog 失败: %w", err)
  69. }
  70. existingLabels, err := svcCtx.DB.Label.Query().
  71. Where(
  72. label.NameIn(keysFromMap(labelNameSet)...),
  73. label.OrganizationID(wxInfo.OrganizationID),
  74. ).
  75. Select(label.FieldName).
  76. All(ctx)
  77. if err != nil {
  78. return fmt.Errorf("查询 Label 失败: %w", err)
  79. }
  80. existingLabelLogMap := make(map[uint64]struct{})
  81. for _, log := range existingLabelLogs {
  82. existingLabelLogMap[log.LabelID] = struct{}{}
  83. }
  84. existingLabelMap := make(map[string]struct{})
  85. for _, lab := range existingLabels {
  86. existingLabelMap[lab.Name] = struct{}{}
  87. }
  88. var bulkLabelLogs []*ent.LabelLogCreate
  89. var bulkLabels []*ent.LabelCreate
  90. for _, labelWx := range message.LabelGroups {
  91. if len(labelWx.Labels) == 0 {
  92. continue
  93. }
  94. for _, labelChildInfo := range labelWx.Labels {
  95. // 时间戳处理
  96. tsInt := int64(labelChildInfo.CreateTime)
  97. // 插入 LabelLog
  98. labelWecomId, err := StringToUint64(labelChildInfo.Id)
  99. if err != nil {
  100. logx.Errorf("时间戳转换失败: %v (Label ID: %d)", err, labelWx.Id)
  101. continue
  102. }
  103. s := labelWx.Name + "-" + labelChildInfo.Name
  104. if _, exists := existingLabelLogMap[labelWecomId]; !exists {
  105. bulkLabelLogs = append(bulkLabelLogs,
  106. svcCtx.DB.LabelLog.Create().
  107. SetLabelName(s).
  108. SetLabelID(labelWecomId).
  109. SetOrganizationID(wxInfo.OrganizationID).
  110. SetWxID(message.WxId).
  111. SetCreatedAt(time.Unix(tsInt, 0)),
  112. )
  113. }
  114. // 插入 Label
  115. if _, exists := existingLabelMap[s]; !exists {
  116. bulkLabels = append(bulkLabels,
  117. svcCtx.DB.Label.Create().
  118. SetName(s).
  119. SetType(4).
  120. SetStatus(1).
  121. SetOrganizationID(wxInfo.OrganizationID).
  122. SetFrom(3). // 标签来源:1后台创建 2个微同步 3企业微信同步
  123. SetMode(1).
  124. SetConditions(`{}`).
  125. SetCreatedAt(time.Now()).
  126. SetUpdatedAt(time.Now()),
  127. )
  128. }
  129. }
  130. }
  131. lock.LockWxId(message.WxId)
  132. defer lock.UnlockWxId(message.WxId)
  133. // 批量插入 LabelLog
  134. if len(bulkLabelLogs) > 0 {
  135. err := svcCtx.DB.LabelLog.CreateBulk(bulkLabelLogs...).
  136. OnConflict(sql.ConflictColumns(labellog.FieldLabelID, labellog.FieldWxID, labellog.FieldOrganizationID)).
  137. DoNothing().
  138. Exec(ctx)
  139. if err != nil {
  140. logx.Error("labelLog 批量插入失败", bulkLabelLogs)
  141. }
  142. }
  143. // 批量插入 Label
  144. if len(bulkLabels) > 0 {
  145. err := svcCtx.DB.Label.CreateBulk(bulkLabels...).
  146. OnConflict(sql.ConflictColumns(label.FieldName, label.FieldOrganizationID)).
  147. DoNothing().
  148. Exec(ctx)
  149. if err != nil {
  150. logx.Error("label 批量插入失败", bulkLabels)
  151. return err
  152. }
  153. }
  154. return nil
  155. }
  156. func keysFromMap(m map[string]struct{}) []string {
  157. keys := make([]string, 0, len(m))
  158. for k := range m {
  159. keys = append(keys, k)
  160. }
  161. return keys
  162. }
  163. func StringToUint64(s string) (uint64, error) {
  164. return strconv.ParseUint(s, 10, 64)
  165. }