contact_Label_info_notice.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package MessageHandlers
  2. import (
  3. "context"
  4. "encoding/json"
  5. "entgo.io/ent/dialect/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "strconv"
  9. "time"
  10. "wechat-api/ent"
  11. "wechat-api/ent/label"
  12. "wechat-api/ent/labellog"
  13. "wechat-api/ent/wx"
  14. "wechat-api/internal/pkg/wechat_ws"
  15. "wechat-api/internal/svc"
  16. "wechat-api/workphone"
  17. )
  18. type ContactLabelInfoNotice struct {
  19. svcCtx *svc.ServiceContext
  20. }
  21. func NewContactLabelInfoNotice(svcCtx *svc.ServiceContext) *ContactLabelInfoNotice {
  22. return &ContactLabelInfoNotice{
  23. svcCtx: svcCtx,
  24. }
  25. }
  26. // Handle 实现 MessageHandlerStrategy 接口
  27. func (f *ContactLabelInfoNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
  28. var message workphone.ContactLabelInfoNoticeMessage
  29. logx.Infof("msg.Message 的内容是:%s", msg.Message)
  30. if err := json.Unmarshal([]byte(msg.Message), &message); err != nil {
  31. return err
  32. }
  33. wxInfo, err := svcCtx.DB.Wx.Query().
  34. Where(wx.WxidEQ(message.WeChatId)).
  35. Only(ctx)
  36. if err != nil {
  37. return err
  38. }
  39. // 整理标签ID和名称列表
  40. labelIDs := make([]int, 0, len(message.Labels))
  41. labelNameSet := make(map[string]struct{}) // 用于去重
  42. for _, labelInfoMessage := range message.Labels {
  43. labelIDs = append(labelIDs, int(labelInfoMessage.LabelId))
  44. labelNameSet[labelInfoMessage.LabelName] = struct{}{}
  45. }
  46. // 提前查询现有 LabelLog 和 Label
  47. existingLabelLogs, err := svcCtx.DB.LabelLog.Query().
  48. Where(
  49. labellog.LabelIDIn(labelIDs...),
  50. labellog.OrganizationID(wxInfo.OrganizationID),
  51. labellog.WxID(message.WeChatId),
  52. ).
  53. Select(labellog.FieldLabelID).
  54. All(ctx)
  55. if err != nil {
  56. return fmt.Errorf("查询 LabelLog 失败: %w", err)
  57. }
  58. existingLabels, err := svcCtx.DB.Label.Query().
  59. Where(
  60. label.NameIn(keysFromMap(labelNameSet)...),
  61. label.OrganizationID(wxInfo.OrganizationID),
  62. ).
  63. Select(label.FieldName).
  64. All(ctx)
  65. if err != nil {
  66. return fmt.Errorf("查询 Label 失败: %w", err)
  67. }
  68. existingLabelLogMap := make(map[int]struct{})
  69. for _, log := range existingLabelLogs {
  70. existingLabelLogMap[log.LabelID] = struct{}{}
  71. }
  72. existingLabelMap := make(map[string]struct{})
  73. for _, lab := range existingLabels {
  74. existingLabelMap[lab.Name] = struct{}{}
  75. }
  76. var bulkLabelLogs []*ent.LabelLogCreate
  77. var bulkLabels []*ent.LabelCreate
  78. for _, labelWx := range message.Labels {
  79. // 时间戳处理
  80. tsInt, err := strconv.ParseInt(labelWx.CreateTime, 10, 64)
  81. if err != nil {
  82. logx.Errorf("时间戳转换失败: %v (Label ID: %d)", err, labelWx.LabelId)
  83. continue
  84. }
  85. // 插入 LabelLog
  86. if _, exists := existingLabelLogMap[int(labelWx.LabelId)]; !exists {
  87. bulkLabelLogs = append(bulkLabelLogs,
  88. svcCtx.DB.LabelLog.Create().
  89. SetLabelName(labelWx.LabelName).
  90. SetLabelID(int(labelWx.LabelId)).
  91. SetOrganizationID(wxInfo.OrganizationID).
  92. SetWxID(message.WeChatId).
  93. SetCreatedAt(time.Unix(tsInt/1000, 0)),
  94. )
  95. }
  96. // 插入 Label
  97. if _, exists := existingLabelMap[labelWx.LabelName]; !exists {
  98. bulkLabels = append(bulkLabels,
  99. svcCtx.DB.Label.Create().
  100. SetName(labelWx.LabelName).
  101. SetType(1).
  102. SetStatus(1).
  103. SetOrganizationID(wxInfo.OrganizationID).
  104. SetFrom(2).
  105. SetMode(1).
  106. SetConditions(`{}`).
  107. SetCreatedAt(time.Now()).
  108. SetUpdatedAt(time.Now()),
  109. )
  110. }
  111. }
  112. // 批量插入 LabelLog
  113. if len(bulkLabelLogs) > 0 {
  114. err := svcCtx.DB.LabelLog.CreateBulk(bulkLabelLogs...).
  115. OnConflict(sql.ConflictColumns(labellog.FieldLabelID, labellog.FieldWxID, labellog.FieldOrganizationID)).
  116. DoNothing().
  117. Exec(ctx)
  118. if err != nil {
  119. logx.Error("labelLog 批量插入失败", bulkLabelLogs)
  120. }
  121. }
  122. // 批量插入 Label
  123. if len(bulkLabels) > 0 {
  124. err := svcCtx.DB.Label.CreateBulk(bulkLabels...).
  125. OnConflict(sql.ConflictColumns(label.FieldName, label.FieldOrganizationID)).
  126. DoNothing().
  127. Exec(ctx)
  128. if err != nil {
  129. logx.Error("label 批量插入失败", bulkLabels)
  130. return err
  131. }
  132. }
  133. return nil
  134. }
  135. func keysFromMap(m map[string]struct{}) []string {
  136. keys := make([]string, 0, len(m))
  137. for k := range m {
  138. keys = append(keys, k)
  139. }
  140. return keys
  141. }