contact_Label_info_notice.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package MessageHandlers
  2. import (
  3. "context"
  4. "encoding/json"
  5. "entgo.io/ent/dialect/sql"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "strconv"
  8. "time"
  9. "wechat-api/ent"
  10. "wechat-api/ent/wx"
  11. "wechat-api/internal/pkg/wechat_ws"
  12. "wechat-api/internal/svc"
  13. "wechat-api/workphone"
  14. )
  15. type ContactLabelInfoNotice struct {
  16. svcCtx *svc.ServiceContext
  17. }
  18. func NewContactLabelInfoNotice(svcCtx *svc.ServiceContext) *ContactLabelInfoNotice {
  19. return &ContactLabelInfoNotice{
  20. svcCtx: svcCtx,
  21. }
  22. }
  23. // Handle 实现 MessageHandlerStrategy 接口
  24. func (f *ContactLabelInfoNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
  25. message := workphone.ContactLabelInfoNoticeMessage{}
  26. logx.Infof("msg.Message 的内容是:%s", msg.Message)
  27. err := json.Unmarshal([]byte(msg.Message), &message)
  28. if err != nil {
  29. return err
  30. }
  31. // 拿到租户 id
  32. wxInfo, err := svcCtx.DB.Wx.Query().
  33. Where(
  34. wx.WxidEQ(message.WeChatId), // Additional filter by organizationId
  35. ).
  36. Only(ctx)
  37. if err != nil {
  38. return err
  39. }
  40. var bulkCreates []*ent.LabelLogCreate
  41. var labelCreates []*ent.LabelCreate
  42. for _, label := range message.Labels {
  43. labelIDSet := make(map[int]struct{})
  44. labelIDSet[int(label.LabelId)] = struct{}{}
  45. var labelIDs []int
  46. for id := range labelIDSet {
  47. labelIDs = append(labelIDs, id)
  48. }
  49. existingMap := make(map[int]struct{})
  50. // 只拼接还没插入过的 label_id + organization_id 组合
  51. if _, exists := existingMap[int(label.LabelId)]; exists {
  52. logx.Error("label_log---已经存在的: ", wxInfo.OrganizationID, label.LabelId)
  53. continue
  54. }
  55. tsInt, err := strconv.ParseInt(label.CreateTime, 10, 64)
  56. if err != nil {
  57. logx.Errorf("时间戳转换失败: %v", err)
  58. continue
  59. }
  60. bulkCreates = append(bulkCreates,
  61. svcCtx.DB.LabelLog.Create().
  62. SetLabelName(label.LabelName).
  63. SetLabelID(int(label.LabelId)).
  64. SetOrganizationID(wxInfo.OrganizationID).
  65. SetWxID(message.WeChatId).
  66. SetCreatedAt(time.Unix(tsInt/1000, 0)),
  67. //SetUpdatedAt(time.Now()),
  68. )
  69. //label 主表
  70. labelCreates = append(labelCreates,
  71. svcCtx.DB.Label.Create().
  72. //SetID(int(label.LabelId)).
  73. SetName(label.LabelName).
  74. SetType(1).
  75. SetStatus(1).
  76. SetOrganizationID(wxInfo.OrganizationID).
  77. SetFrom(2).
  78. SetMode(2).
  79. SetConditions(string(json.RawMessage(`{}`))).
  80. SetCreatedAt(time.Now()).
  81. SetUpdatedAt(time.Now()),
  82. )
  83. logx.Info("数据:", label.LabelName+"-----", label.LabelId, wxInfo.OrganizationID, message.WeChatId, time.Unix(tsInt/1000, 0))
  84. }
  85. //批量插入labelLog
  86. if len(bulkCreates) > 0 {
  87. err := svcCtx.DB.LabelLog.CreateBulk(bulkCreates...).
  88. OnConflict(
  89. sql.ConflictColumns("label_id", "organization_id"),
  90. ).
  91. DoNothing().
  92. Exec(ctx)
  93. if err != nil {
  94. logx.Error("labelLog 批量插入失败", bulkCreates)
  95. //return err
  96. }
  97. }
  98. //批量插入label
  99. if len(labelCreates) > 0 {
  100. err := svcCtx.DB.Label.CreateBulk(labelCreates...).
  101. OnConflict(
  102. sql.ConflictColumns("name", "organization_id"),
  103. ).
  104. DoNothing().
  105. Exec(ctx)
  106. if err != nil {
  107. logx.Error("label 批量插入失败", labelCreates)
  108. return err
  109. }
  110. }
  111. return nil
  112. }