send_msg.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package crontask
  2. import (
  3. "strings"
  4. "time"
  5. "wechat-api/ent"
  6. "wechat-api/ent/batchmsg"
  7. "wechat-api/ent/contact"
  8. "wechat-api/ent/label"
  9. "wechat-api/ent/labelrelationship"
  10. "wechat-api/ent/msg"
  11. "wechat-api/ent/wx"
  12. "wechat-api/hook"
  13. )
  14. func (l *CronTask) sendMsg() {
  15. // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
  16. batchlist, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
  17. if err != nil {
  18. l.Logger.Errorf("batchlist err: %v", err)
  19. return
  20. }
  21. for _, batch := range batchlist {
  22. // 记录当前批次开始处理
  23. l.Logger.Info("batch start: ", batch.BatchNo)
  24. // 如果 批次 status 为 0,则先产生待发送消息
  25. if batch.Status == 0 {
  26. userlist := make([]*ent.Contact, 0)
  27. if batch.Tag == "all" {
  28. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  29. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
  30. if err != nil {
  31. l.Logger.Errorf("userlist err: %v", err)
  32. continue
  33. }
  34. } else {
  35. tags := strings.Split(batch.Tag, ",")
  36. // 获取 label 表中 name 为 tags的记录
  37. labids, err := l.svcCtx.DB.Label.Query().Where(label.NameIn(tags...)).IDs(l.ctx)
  38. if err != nil {
  39. l.Logger.Errorf("labids err: %v", err)
  40. continue
  41. }
  42. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  43. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labids...)).All(l.ctx)
  44. if err != nil {
  45. l.Logger.Errorf("labelrelationships err: %v", err)
  46. continue
  47. }
  48. contact_ids := make([]uint64, 0)
  49. for _, labelrelationship := range labelrelationships {
  50. contact_ids = append(contact_ids, labelrelationship.ContactID)
  51. }
  52. if len(contact_ids) > 0 {
  53. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  54. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.IDIn(contact_ids...), contact.TypeIn(1, 2)).All(l.ctx)
  55. if err != nil {
  56. l.Logger.Errorf("userlist err: %v", err)
  57. continue
  58. }
  59. }
  60. }
  61. msgs := make([]*ent.MsgCreate, 0)
  62. for _, user := range userlist {
  63. msg := l.svcCtx.DB.Msg.Create().
  64. SetNotNilFromwxid(&batch.Fromwxid).
  65. SetNotNilToid(&user.Wxid).
  66. SetMsgtype(1).
  67. SetNotNilMsg(&batch.Msg).
  68. SetStatus(0).
  69. SetNotNilBatchNo(&batch.BatchNo)
  70. msgs = append(msgs, msg)
  71. }
  72. if len(msgs) > 0 {
  73. _, err = l.svcCtx.DB.Msg.CreateBulk(msgs...).Save(l.ctx)
  74. if err != nil {
  75. l.Logger.Errorf("msg CreateBulk err: %v", err)
  76. continue
  77. }
  78. } else {
  79. // 如果没有消息,直接更新批次状态为已发送
  80. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  81. SetStatus(2).
  82. SetTotal(0).
  83. SetSuccess(0).
  84. SetFail(0).
  85. Save(l.ctx)
  86. if err != nil {
  87. l.Logger.Errorf("batchmsg update err: %v", err)
  88. }
  89. continue
  90. }
  91. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
  92. if err != nil {
  93. l.Logger.Errorf("batchmsg update err: %v", err)
  94. continue
  95. }
  96. }
  97. // 获取当前批次的所有待发送消息
  98. msglist, err := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0)).All(l.ctx)
  99. if err != nil {
  100. l.Logger.Errorf("msglist err: %v", err)
  101. continue
  102. }
  103. wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
  104. if err != nil {
  105. l.Logger.Errorf("wxInfo err: %v", err)
  106. continue
  107. }
  108. serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
  109. if err != nil {
  110. l.Logger.Errorf("serverInfo err: %v", err)
  111. continue
  112. }
  113. hookClient := hook.NewHook(serverInfo.PrivateIP, serverInfo.AdminPort, wxInfo.Port)
  114. //循环发送消息
  115. for _, msg := range msglist {
  116. err = hookClient.SendTextMsg(msg.Toid, msg.Msg)
  117. // 每次发完暂停1秒
  118. time.Sleep(time.Second)
  119. if err != nil {
  120. l.Logger.Errorf("send msg err: %v", err)
  121. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
  122. if err != nil {
  123. l.Logger.Errorf("msg update err: %v", err)
  124. continue
  125. }
  126. continue
  127. }
  128. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
  129. if err != nil {
  130. l.Logger.Errorf("msg update err: %v", err)
  131. continue
  132. }
  133. }
  134. // 获取当前批次的所有发送的消息总数
  135. total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
  136. // 获取当前批次的所有发送成功的消息总数
  137. success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
  138. // 获取当前批次的所有发送失败的消息总数
  139. fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
  140. // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
  141. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  142. SetStatus(2).
  143. SetTotal(int32(total)).
  144. SetSuccess(int32(success)).
  145. SetFail(int32(fail)).
  146. SetStopTime(time.Now()).
  147. Save(l.ctx)
  148. if err != nil {
  149. l.Logger.Errorf("batchmsg update err: %v", err)
  150. continue
  151. }
  152. l.Logger.Info("batch stop: ", batch.BatchNo)
  153. }
  154. }