send_msg.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. package crontask
  2. import (
  3. "encoding/json"
  4. "net/url"
  5. "path"
  6. "strings"
  7. "time"
  8. "wechat-api/ent"
  9. "wechat-api/ent/batchmsg"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/custom_types"
  12. "wechat-api/ent/label"
  13. "wechat-api/ent/labelrelationship"
  14. "wechat-api/ent/msg"
  15. "wechat-api/ent/wx"
  16. "wechat-api/hook"
  17. )
  18. func (l *CronTask) sendMsg() {
  19. // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
  20. batchlist, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
  21. if err != nil {
  22. l.Logger.Errorf("batchlist err: %v", err)
  23. return
  24. }
  25. for _, batch := range batchlist {
  26. // 记录当前批次开始处理
  27. l.Logger.Info("batch start: ", batch.BatchNo)
  28. // 如果 批次 status 为 0,则先产生待发送消息
  29. if batch.Status == 0 {
  30. userlist := make([]*ent.Contact, 0)
  31. if batch.Tag == "all" {
  32. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  33. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
  34. if err != nil {
  35. l.Logger.Errorf("userlist err: %v", err)
  36. continue
  37. }
  38. } else {
  39. tags := strings.Split(batch.Tag, ",")
  40. // 获取 label 表中 name 为 tags的记录
  41. labids, err := l.svcCtx.DB.Label.Query().Where(label.NameIn(tags...)).IDs(l.ctx)
  42. if err != nil {
  43. l.Logger.Errorf("labids err: %v", err)
  44. continue
  45. }
  46. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  47. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labids...)).All(l.ctx)
  48. if err != nil {
  49. l.Logger.Errorf("labelrelationships err: %v", err)
  50. continue
  51. }
  52. contact_ids := make([]uint64, 0)
  53. for _, labelrelationship := range labelrelationships {
  54. contact_ids = append(contact_ids, labelrelationship.ContactID)
  55. }
  56. if len(contact_ids) > 0 {
  57. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  58. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.IDIn(contact_ids...), contact.TypeIn(1, 2)).All(l.ctx)
  59. if err != nil {
  60. l.Logger.Errorf("userlist err: %v", err)
  61. continue
  62. }
  63. }
  64. }
  65. // 这里是待插入到 msg 表的数据
  66. msgs := make([]*ent.MsgCreate, 0)
  67. // 这里是把 batch.Msg 转换为 json 数组
  68. msgArray := make([]custom_types.Action, 0)
  69. err := json.Unmarshal([]byte(batch.Msg), &msgArray)
  70. l.Logger.Infof("msgArray length= %v, err:%v", len(msgArray), err)
  71. if err != nil {
  72. // json 解析失败
  73. msgArray = make([]custom_types.Action, 0)
  74. }
  75. for _, user := range userlist {
  76. // 这里改动主要是 batch_msg 目前支持批量添加图文,导致 batch_msg 的 msg 字段为 json
  77. // msg 里包括文字和图片,msgtype=1 为文字, msgtype=2 为图片
  78. // 每一条文字或者图片 都是一条单独的消息
  79. if len(msgArray) > 0 {
  80. // 这里是新格式(msg内容为json),需要遍历数组
  81. for _, msgItem := range msgArray {
  82. msgRow := l.svcCtx.DB.Msg.Create().
  83. SetNotNilFromwxid(&batch.Fromwxid).
  84. SetNotNilToid(&user.Wxid).
  85. SetMsgtype(int32(msgItem.Type)).
  86. SetNotNilMsg(&msgItem.Content).
  87. SetStatus(0).
  88. SetNotNilBatchNo(&batch.BatchNo)
  89. msgs = append(msgs, msgRow)
  90. }
  91. }
  92. }
  93. if len(msgs) > 0 {
  94. // 加事务,批量操作一条 batch_msg 和 一堆 msg 信息
  95. tx, err := l.svcCtx.DB.Tx(l.ctx)
  96. if err != nil {
  97. l.Logger.Errorf("start db transaction err: %v", err)
  98. continue
  99. }
  100. _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
  101. if err != nil {
  102. _ = tx.Rollback()
  103. l.Logger.Errorf("batchmsg update err: %v", err)
  104. continue
  105. }
  106. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  107. if err != nil {
  108. _ = tx.Rollback()
  109. l.Logger.Errorf("msg CreateBulk err: %v", err)
  110. continue
  111. }
  112. _ = tx.Commit()
  113. } else {
  114. // 如果没有消息,直接更新批次状态为已发送
  115. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  116. SetStatus(2).
  117. SetTotal(0).
  118. SetSuccess(0).
  119. SetFail(0).
  120. Save(l.ctx)
  121. if err != nil {
  122. l.Logger.Errorf("batchmsg update err: %v", err)
  123. }
  124. continue
  125. }
  126. }
  127. // 获取当前批次的所有待发送消息
  128. msglist, err := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0)).All(l.ctx)
  129. if err != nil {
  130. l.Logger.Errorf("msglist err: %v", err)
  131. continue
  132. }
  133. wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
  134. if err != nil {
  135. l.Logger.Errorf("wxInfo err: %v", err)
  136. continue
  137. }
  138. serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
  139. if err != nil {
  140. l.Logger.Errorf("serverInfo err: %v", err)
  141. continue
  142. }
  143. hookClient := hook.NewHook(serverInfo.PrivateIP, serverInfo.AdminPort, wxInfo.Port)
  144. //循环发送消息
  145. for _, msg := range msglist {
  146. // 这里之前只有文字消息(既 msgtype=1) 目前增加了图片 所以增加了msgtype=2
  147. // 所以增加了一个判断,判断发送的内容类型,如果是文字就调用SendTextMsg,如果是图片就调用SendPicMsg
  148. if msg.Msgtype == 1 {
  149. err = hookClient.SendTextMsg(msg.Toid, msg.Msg)
  150. } else if msg.Msgtype == 2 {
  151. diyfilename := getFileName(msg.Msg)
  152. err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename)
  153. }
  154. // 每次发完暂停1秒
  155. time.Sleep(time.Second)
  156. if err != nil {
  157. l.Logger.Errorf("send msg err: %v", err)
  158. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
  159. if err != nil {
  160. l.Logger.Errorf("msg update err: %v", err)
  161. continue
  162. }
  163. continue
  164. }
  165. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
  166. if err != nil {
  167. l.Logger.Errorf("msg update err: %v", err)
  168. continue
  169. }
  170. }
  171. // 获取当前批次的所有发送的消息总数
  172. total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
  173. // 获取当前批次的所有发送成功的消息总数
  174. success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
  175. // 获取当前批次的所有发送失败的消息总数
  176. fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
  177. // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
  178. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  179. SetStatus(2).
  180. SetTotal(int32(total)).
  181. SetSuccess(int32(success)).
  182. SetFail(int32(fail)).
  183. SetStopTime(time.Now()).
  184. Save(l.ctx)
  185. if err != nil {
  186. l.Logger.Errorf("batchmsg update err: %v", err)
  187. continue
  188. }
  189. l.Logger.Info("batch stop: ", batch.BatchNo)
  190. }
  191. }
  192. // 根据URL获取图片名
  193. func getFileName(photoUrl string) string {
  194. u, err := url.Parse(photoUrl)
  195. if err != nil {
  196. return ""
  197. }
  198. return path.Base(u.Path)
  199. }