customer_push_notice.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package wecom
  2. import (
  3. "context"
  4. "encoding/json"
  5. "entgo.io/ent/dialect/sql"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "wechat-api/ent"
  12. "wechat-api/ent/label"
  13. "wechat-api/ent/labellog"
  14. "wechat-api/ent/labelrelationship"
  15. "wechat-api/ent/wx"
  16. "wechat-api/internal/lock"
  17. "wechat-api/internal/pkg/wechat_ws"
  18. "wechat-api/internal/svc"
  19. "wechat-api/workphone/wecom"
  20. )
  21. type CustomerPushNoticeHandler struct {
  22. svcCtx *svc.ServiceContext
  23. lockMap sync.Map // 微信号 -> *sync.Mutex
  24. }
  25. func NewCustomerPushNoticeHandler(svcCtx *svc.ServiceContext) *CustomerPushNoticeHandler {
  26. return &CustomerPushNoticeHandler{
  27. svcCtx: svcCtx,
  28. }
  29. }
  30. // Handle 实现 MessageHandlerStrategy 接口
  31. func (f *CustomerPushNoticeHandler) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
  32. message := wecom.CustomerPushNoticeMessage{}
  33. err := json.Unmarshal([]byte(msg.Message), &message)
  34. logx.Infof("CustomerPushNotice.Message 的内容是:%s", msg.Message)
  35. if err != nil {
  36. logx.Errorf("Unmarshal.fail")
  37. return err
  38. }
  39. wxInfo, err := svcCtx.DB.Wx.Query().
  40. Where(
  41. wx.WxidEQ(message.WxId), // Additional filter by organizationId
  42. ).Only(ctx)
  43. if err != nil {
  44. return err
  45. }
  46. var labelRelationshipCreates []*ent.LabelRelationshipCreate
  47. var ctype uint64
  48. ctype = 3
  49. for _, friend := range message.Contacts {
  50. var friendId uint64
  51. //Wxid := strconv.FormatInt(friend.RemoteId, 10)
  52. friendType := 1
  53. if friend.Suffix == "微信" {
  54. friendType = 1
  55. } else {
  56. friendType = 4
  57. }
  58. list := friend.PhoneRemark
  59. var phone string
  60. if len(list) > 0 {
  61. phone = list[0]
  62. }
  63. var sex int
  64. if friend.Gender == "Male" {
  65. sex = 1
  66. } else if friend.Gender == "Female" {
  67. sex = 2
  68. } else {
  69. sex = 0
  70. }
  71. //修改拉黑后的状态被重置
  72. friendId, err = svcCtx.DB.Contact.Create().
  73. SetWxWxid(message.WxId).
  74. SetType(friendType).
  75. SetWxid(friend.RemoteId).
  76. SetNickname(friend.Name).
  77. SetMarkname(friend.Remark).
  78. SetHeadimg(friend.Avatar).
  79. SetPhone(phone).
  80. SetSex(sex).
  81. SetCtype(ctype).
  82. SetOrganizationID(wxInfo.OrganizationID).
  83. OnConflict().
  84. UpdateWxWxid().
  85. UpdateWxid().
  86. UpdateType().
  87. UpdateAccount().
  88. UpdateNickname().
  89. UpdateMarkname().
  90. UpdateHeadimg().
  91. UpdateOrganizationID().
  92. UpdatePhone().
  93. UpdateSex().
  94. UpdateCtype().
  95. ID(ctx)
  96. if err != nil {
  97. logx.Errorf("Contact.Create 失败, OrgID=%d, err=%v", wxInfo.OrganizationID, err)
  98. return err
  99. }
  100. //判断friend里的labelId="1,2,3,4,5"为空就不处理了,不为空的时候就查下label表里有没有这个labelId,没有就插入,有就跳过
  101. if friend.LabelIds == nil || len(friend.LabelIds) == 0 {
  102. logx.Infof("没有labelIds 失败, wx_wxId=%v", message.WxId)
  103. continue
  104. }
  105. labelIdsStr := friend.LabelIds
  106. var ids []uint64
  107. ids, err = stringSliceToUint64Slice(labelIdsStr)
  108. if err != nil {
  109. logx.Infof("labelstring切割失败, labelIds=%v", labelIdsStr)
  110. continue
  111. }
  112. LabelLogs, err := svcCtx.DB.LabelLog.Query().
  113. Where(labellog.LabelIDIn(ids...)).
  114. Where(labellog.WxID(message.WxId)).
  115. All(ctx)
  116. if err != nil || len(LabelLogs) == 0 {
  117. logx.Infof("labelLog.Query.fail: 跳过 || 或者查询失败 组织id:%d", wxInfo.OrganizationID)
  118. continue
  119. }
  120. //映射本地的name + type + model + organization_id
  121. currentOrgID := wxInfo.OrganizationID
  122. for _, remoteLabel := range LabelLogs {
  123. labelInfo, err := svcCtx.DB.Label.Query().Where(
  124. label.NameEQ(remoteLabel.LabelName),
  125. //label.StatusEQ(remoteLabel.LabelName),
  126. label.OrganizationID(currentOrgID),
  127. ).Only(ctx)
  128. if err != nil || ent.IsNotFound(err) {
  129. logx.Infof("label id %d not found.fail: ", wxInfo.OrganizationID)
  130. continue
  131. }
  132. //labelId, err := strconv.ParseUint(strconv.FormatUint(labelInfo.ID, 10), 10, 64)
  133. if err != nil {
  134. fmt.Println("转换出错:", err)
  135. continue
  136. }
  137. _, err = svcCtx.DB.LabelRelationship.Query().Where(
  138. labelrelationship.LabelIDEQ(labelInfo.ID),
  139. //label.StatusEQ(remoteLabel.LabelName),
  140. labelrelationship.ContactIDEQ(friendId),
  141. ).Only(ctx)
  142. if err != nil || ent.IsNotFound(err) {
  143. labelRelationshipCreates = append(labelRelationshipCreates,
  144. svcCtx.DB.LabelRelationship.Create().
  145. //SetID(int(label.LabelId)).
  146. SetOrganizationID(wxInfo.OrganizationID).
  147. SetContactID(friendId).
  148. SetStatus(1).
  149. SetLabelID(labelInfo.ID).
  150. SetCreatedAt(time.Now()).
  151. SetUpdatedAt(time.Now()),
  152. )
  153. }
  154. }
  155. }
  156. if len(labelRelationshipCreates) > 0 {
  157. lock.GetWxIdLockManager().RunWithLock(message.WxId, func() {
  158. errShip := svcCtx.DB.LabelRelationship.CreateBulk(labelRelationshipCreates...).
  159. OnConflict(
  160. sql.ConflictColumns(labelrelationship.FieldLabelID, labelrelationship.FieldContactID),
  161. ).DoNothing().Exec(ctx)
  162. if errShip != nil {
  163. logx.Error("label_relationship.create.fail: ", wxInfo.OrganizationID, labelRelationshipCreates)
  164. }
  165. })
  166. }
  167. return nil
  168. }
  169. func stringSliceToUint64Slice(strSlice []string) ([]uint64, error) {
  170. uintSlice := make([]uint64, 0, len(strSlice))
  171. for _, s := range strSlice {
  172. n, err := strconv.ParseUint(s, 10, 64)
  173. if err != nil {
  174. fmt.Printf("转换失败: %v\n", err)
  175. continue // 你可以选择跳过或直接 return 报错
  176. }
  177. uintSlice = append(uintSlice, n)
  178. }
  179. return uintSlice, nil
  180. }
  181. func (f *CustomerPushNoticeHandler) getWxLock(wxid string) *sync.Mutex {
  182. actual, _ := f.lockMap.LoadOrStore(wxid, &sync.Mutex{})
  183. return actual.(*sync.Mutex)
  184. }