send_msg.go 8.7 KB


  1. package crontask
  2. import (
  3. "encoding/json"
  4. "net/url"
  5. "path"
  6. "time"
  7. "wechat-api/ent"
  8. "wechat-api/ent/batchmsg"
  9. "wechat-api/ent/contact"
  10. "wechat-api/ent/custom_types"
  11. "wechat-api/ent/labelrelationship"
  12. "wechat-api/ent/msg"
  13. "wechat-api/ent/wx"
  14. "wechat-api/hook"
  15. "wechat-api/internal/utils/dberrorhandler"
  16. )
  17. func (l *CronTask) sendMsg() {
  18. // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
  19. batchlist, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
  20. if err != nil {
  21. l.Logger.Errorf("batchlist err: %v", err)
  22. return
  23. }
  24. for _, batch := range batchlist {
  25. // 记录当前批次开始处理
  26. l.Logger.Info("batch start: ", batch.BatchNo)
  27. // 如果 批次 status 为 0,则先产生待发送消息
  28. if batch.Status == 0 {
  29. userList := make([]*ent.Contact, 0)
  30. groupList := make([]*ent.Contact, 0)
  31. tagMap := make(map[string][]uint64)
  32. err = json.Unmarshal([]byte(batch.Tagids), &tagMap)
  33. if err != nil {
  34. continue
  35. }
  36. l.Logger.Infof("send_msg.go batch.Tagids = %v\n", tagMap)
  37. var allContact, allGroup, ok bool
  38. var contactTags, groupTags []uint64
  39. if contactTags, ok = tagMap["contact_tag"]; ok {
  40. allContact = hasAll(contactTags, 0)
  41. l.Logger.Infof("contactTags=%v", contactTags)
  42. }
  43. if groupTags, ok = tagMap["group_tag"]; ok {
  44. allGroup = hasAll(groupTags, 0)
  45. l.Logger.Infof("groupTags=%v", groupTags)
  46. }
  47. var err error
  48. if allContact && allGroup {
  49. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  50. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
  51. if err != nil {
  52. l.Logger.Errorf("userlist err: %v", err)
  53. continue
  54. }
  55. } else {
  56. if allContact { // 所有联系人
  57. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
  58. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeEQ(1)).All(l.ctx)
  59. if err != nil {
  60. l.Logger.Errorf("userList err: %v", err)
  61. continue
  62. }
  63. } else { //获取指定标签的联系人
  64. userList, err = getContactList(l, contactTags, batch.Fromwxid, 1)
  65. if err != nil {
  66. l.Logger.Errorf("userList err: %v", err)
  67. continue
  68. }
  69. }
  70. if allGroup { //所有群
  71. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为2的数据
  72. groupList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeEQ(2)).All(l.ctx)
  73. if err != nil {
  74. l.Logger.Errorf("groupList err: %v", err)
  75. continue
  76. }
  77. } else { //获取指定标签的群
  78. groupList, err = getContactList(l, groupTags, batch.Fromwxid, 2)
  79. if err != nil {
  80. l.Logger.Errorf("groupList err: %v", err)
  81. continue
  82. }
  83. }
  84. if len(groupList) > 0 {
  85. userList = append(userList, groupList...)
  86. }
  87. }
  88. // 这里是待插入到 msg 表的数据
  89. msgs := make([]*ent.MsgCreate, 0)
  90. // 这里是把 batch.Msg 转换为 json 数组
  91. msgArray := make([]custom_types.Action, 0)
  92. err = json.Unmarshal([]byte(batch.Msg), &msgArray)
  93. l.Logger.Infof("msgArray length= %v, err:%v", len(msgArray), err)
  94. if err != nil {
  95. // json 解析失败
  96. msgArray = make([]custom_types.Action, 0)
  97. }
  98. for _, user := range userList {
  99. // 这里改动主要是 batch_msg 目前支持批量添加图文,导致 batch_msg 的 msg 字段为 json
  100. // msg 里包括文字和图片,msgtype=1 为文字, msgtype=2 为图片
  101. // 每一条文字或者图片 都是一条单独的消息
  102. if len(msgArray) > 0 {
  103. // 这里是新格式(msg内容为json),需要遍历数组
  104. for _, msgItem := range msgArray {
  105. msgRow := l.svcCtx.DB.Msg.Create().
  106. SetNotNilFromwxid(&batch.Fromwxid).
  107. SetNotNilToid(&user.Wxid).
  108. SetMsgtype(int32(msgItem.Type)).
  109. SetNotNilMsg(&msgItem.Content).
  110. SetStatus(0).
  111. SetNotNilBatchNo(&batch.BatchNo)
  112. msgs = append(msgs, msgRow)
  113. }
  114. }
  115. }
  116. if len(msgs) > 0 {
  117. // 加事务,批量操作一条 batch_msg 和 一堆 msg 信息
  118. tx, err := l.svcCtx.DB.Tx(l.ctx)
  119. if err != nil {
  120. l.Logger.Errorf("start db transaction err: %v", err)
  121. continue
  122. }
  123. _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
  124. if err != nil {
  125. _ = tx.Rollback()
  126. l.Logger.Errorf("batchmsg update err: %v", err)
  127. continue
  128. }
  129. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  130. if err != nil {
  131. _ = tx.Rollback()
  132. l.Logger.Errorf("msg CreateBulk err: %v", err)
  133. continue
  134. }
  135. _ = tx.Commit()
  136. } else {
  137. // 如果没有消息,直接更新批次状态为已发送
  138. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  139. SetStatus(2).
  140. SetTotal(0).
  141. SetSuccess(0).
  142. SetFail(0).
  143. Save(l.ctx)
  144. if err != nil {
  145. l.Logger.Errorf("batchmsg update err: %v", err)
  146. }
  147. continue
  148. }
  149. }
  150. // 获取当前批次的所有待发送消息
  151. msglist, err := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0)).All(l.ctx)
  152. if err != nil {
  153. l.Logger.Errorf("msglist err: %v", err)
  154. continue
  155. }
  156. wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
  157. if err != nil {
  158. l.Logger.Errorf("wxInfo err: %v", err)
  159. continue
  160. }
  161. serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
  162. if err != nil {
  163. l.Logger.Errorf("serverInfo err: %v", err)
  164. continue
  165. }
  166. hookClient := hook.NewHook(serverInfo.PrivateIP, serverInfo.AdminPort, wxInfo.Port)
  167. //循环发送消息
  168. for _, msg := range msglist {
  169. // 这里之前只有文字消息(既 msgtype=1) 目前增加了图片 所以增加了msgtype=2
  170. // 所以增加了一个判断,判断发送的内容类型,如果是文字就调用SendTextMsg,如果是图片就调用SendPicMsg
  171. if msg.Msgtype == 1 {
  172. err = hookClient.SendTextMsg(msg.Toid, msg.Msg)
  173. } else if msg.Msgtype == 2 {
  174. diyfilename := getFileName(msg.Msg)
  175. err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename)
  176. }
  177. // 每次发完暂停1秒
  178. time.Sleep(time.Second)
  179. if err != nil {
  180. l.Logger.Errorf("send msg err: %v", err)
  181. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
  182. if err != nil {
  183. l.Logger.Errorf("msg update err: %v", err)
  184. continue
  185. }
  186. continue
  187. }
  188. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
  189. if err != nil {
  190. l.Logger.Errorf("msg update err: %v", err)
  191. continue
  192. }
  193. }
  194. // 获取当前批次的所有发送的消息总数
  195. total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
  196. // 获取当前批次的所有发送成功的消息总数
  197. success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
  198. // 获取当前批次的所有发送失败的消息总数
  199. fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
  200. // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
  201. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  202. SetStatus(2).
  203. SetTotal(int32(total)).
  204. SetSuccess(int32(success)).
  205. SetFail(int32(fail)).
  206. SetStopTime(time.Now()).
  207. Save(l.ctx)
  208. if err != nil {
  209. l.Logger.Errorf("batchmsg update err: %v", err)
  210. continue
  211. }
  212. l.Logger.Info("batch stop: ", batch.BatchNo)
  213. }
  214. }
  215. // 根据URL获取图片名
  216. func getFileName(photoUrl string) string {
  217. u, err := url.Parse(photoUrl)
  218. if err != nil {
  219. return ""
  220. }
  221. return path.Base(u.Path)
  222. }
  223. func hasAll(array []uint64, target uint64) bool {
  224. for _, val := range array {
  225. if val == 0 {
  226. return true
  227. }
  228. }
  229. return false
  230. }
  231. func getContactList(l *CronTask, labels []uint64, fromWxId string, stype int) ([]*ent.Contact, error) {
  232. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  233. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
  234. if err != nil {
  235. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  236. }
  237. contact_ids := make([]uint64, 0, len(labelrelationships))
  238. for _, labelrelationship := range labelrelationships {
  239. contact_ids = append(contact_ids, labelrelationship.ContactID)
  240. }
  241. userList := make([]*ent.Contact, 0)
  242. if len(contact_ids) > 0 {
  243. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  244. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(fromWxId), contact.IDIn(contact_ids...), contact.TypeEQ(stype)).All(l.ctx)
  245. if err != nil {
  246. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  247. }
  248. }
  249. return userList, nil
  250. }