contact_Label_info_notice.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package MessageHandlers
  2. import (
  3. "context"
  4. "encoding/json"
  5. "entgo.io/ent/dialect/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "strconv"
  9. "time"
  10. "wechat-api/ent"
  11. "wechat-api/ent/label"
  12. "wechat-api/ent/labellog"
  13. "wechat-api/ent/wx"
  14. "wechat-api/internal/lock"
  15. "wechat-api/internal/pkg/wechat_ws"
  16. "wechat-api/internal/svc"
  17. "wechat-api/workphone"
  18. )
  19. type ContactLabelInfoNotice struct {
  20. svcCtx *svc.ServiceContext
  21. }
  22. func NewContactLabelInfoNotice(svcCtx *svc.ServiceContext) *ContactLabelInfoNotice {
  23. return &ContactLabelInfoNotice{
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. // Handle 实现 MessageHandlerStrategy 接口
  28. func (f *ContactLabelInfoNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
  29. var message workphone.ContactLabelInfoNoticeMessage
  30. logx.Infof("msg.Message 的内容是:%s", msg.Message)
  31. if err := json.Unmarshal([]byte(msg.Message), &message); err != nil {
  32. return err
  33. }
  34. wxInfo, err := svcCtx.DB.Wx.Query().
  35. Where(wx.WxidEQ(message.WeChatId)).
  36. Only(ctx)
  37. if err != nil {
  38. return err
  39. }
  40. // 整理标签ID和名称列表
  41. labelIDs := make([]int, 0, len(message.Labels))
  42. labelNameSet := make(map[string]struct{}) // 用于去重
  43. for _, labelInfoMessage := range message.Labels {
  44. labelIDs = append(labelIDs, int(labelInfoMessage.LabelId))
  45. labelNameSet[labelInfoMessage.LabelName] = struct{}{}
  46. }
  47. // 提前查询现有 LabelLog 和 Label
  48. existingLabelLogs, err := svcCtx.DB.LabelLog.Query().
  49. Where(
  50. labellog.LabelIDIn(labelIDs...),
  51. labellog.OrganizationID(wxInfo.OrganizationID),
  52. labellog.WxID(message.WeChatId),
  53. ).
  54. Select(labellog.FieldLabelID).
  55. All(ctx)
  56. if err != nil {
  57. return fmt.Errorf("查询 LabelLog 失败: %w", err)
  58. }
  59. existingLabels, err := svcCtx.DB.Label.Query().
  60. Where(
  61. label.NameIn(keysFromMap(labelNameSet)...),
  62. label.OrganizationID(wxInfo.OrganizationID),
  63. ).
  64. Select(label.FieldName).
  65. All(ctx)
  66. if err != nil {
  67. return fmt.Errorf("查询 Label 失败: %w", err)
  68. }
  69. existingLabelLogMap := make(map[int]struct{})
  70. for _, log := range existingLabelLogs {
  71. existingLabelLogMap[log.LabelID] = struct{}{}
  72. }
  73. existingLabelMap := make(map[string]struct{})
  74. for _, lab := range existingLabels {
  75. existingLabelMap[lab.Name] = struct{}{}
  76. }
  77. var bulkLabelLogs []*ent.LabelLogCreate
  78. var bulkLabels []*ent.LabelCreate
  79. for _, labelWx := range message.Labels {
  80. // 时间戳处理
  81. tsInt, err := strconv.ParseInt(labelWx.CreateTime, 10, 64)
  82. if err != nil {
  83. logx.Errorf("时间戳转换失败: %v (Label ID: %d)", err, labelWx.LabelId)
  84. //continue
  85. tsInt = time.Now().UnixMilli()
  86. }
  87. // 插入 LabelLog
  88. if _, exists := existingLabelLogMap[int(labelWx.LabelId)]; !exists {
  89. bulkLabelLogs = append(bulkLabelLogs,
  90. svcCtx.DB.LabelLog.Create().
  91. SetLabelName(labelWx.LabelName).
  92. SetLabelID(int(labelWx.LabelId)).
  93. SetOrganizationID(wxInfo.OrganizationID).
  94. SetWxID(message.WeChatId).
  95. SetCreatedAt(time.Unix(tsInt/1000, 0)),
  96. )
  97. }
  98. // 插入 Label
  99. if _, exists := existingLabelMap[labelWx.LabelName]; !exists {
  100. bulkLabels = append(bulkLabels,
  101. svcCtx.DB.Label.Create().
  102. SetName(labelWx.LabelName).
  103. SetType(1).
  104. SetStatus(1).
  105. SetOrganizationID(wxInfo.OrganizationID).
  106. SetFrom(2).
  107. SetMode(1).
  108. SetConditions(`{}`).
  109. SetCreatedAt(time.Now()).
  110. SetUpdatedAt(time.Now()),
  111. )
  112. }
  113. }
  114. lock.LockWxId(message.WeChatId)
  115. defer lock.UnlockWxId(message.WeChatId)
  116. // 批量插入 LabelLog
  117. if len(bulkLabelLogs) > 0 {
  118. err := svcCtx.DB.LabelLog.CreateBulk(bulkLabelLogs...).
  119. OnConflict(sql.ConflictColumns(labellog.FieldLabelID, labellog.FieldWxID, labellog.FieldOrganizationID)).
  120. DoNothing().
  121. Exec(ctx)
  122. if err != nil {
  123. logx.Error("labelLog 批量插入失败", bulkLabelLogs)
  124. }
  125. }
  126. // 批量插入 Label
  127. if len(bulkLabels) > 0 {
  128. err := svcCtx.DB.Label.CreateBulk(bulkLabels...).
  129. OnConflict(sql.ConflictColumns(label.FieldName, label.FieldOrganizationID)).
  130. DoNothing().
  131. Exec(ctx)
  132. if err != nil {
  133. logx.Error("label 批量插入失败", bulkLabels)
  134. return err
  135. }
  136. }
  137. return nil
  138. }
  139. func keysFromMap(m map[string]struct{}) []string {
  140. keys := make([]string, 0, len(m))
  141. for k := range m {
  142. keys = append(keys, k)
  143. }
  144. return keys
  145. }