send_msg.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. package crontask
  2. import (
  3. "encoding/json"
  4. "net/url"
  5. "path"
  6. "time"
  7. "wechat-api/ent"
  8. "wechat-api/ent/batchmsg"
  9. "wechat-api/ent/contact"
  10. "wechat-api/ent/custom_types"
  11. "wechat-api/ent/labelrelationship"
  12. "wechat-api/ent/msg"
  13. "wechat-api/ent/wx"
  14. "wechat-api/hook"
  15. "wechat-api/internal/utils/dberrorhandler"
  16. )
  17. func (l *CronTask) sendMsg() {
  18. // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
  19. batchList, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
  20. if err != nil {
  21. l.Logger.Errorf("batchList err: %v", err)
  22. return
  23. }
  24. startTime := time.Now()
  25. for _, batch := range batchList {
  26. // 记录当前批次开始处理
  27. l.Logger.Info("batch start: ", batch.BatchNo)
  28. // 如果 批次 status 为 0,则先产生待发送消息
  29. if batch.Status == 0 {
  30. userList := make([]*ent.Contact, 0)
  31. groupList := make([]*ent.Contact, 0)
  32. tagMap := make(map[string][]uint64)
  33. err = json.Unmarshal([]byte(batch.Tagids), &tagMap)
  34. if err != nil {
  35. continue
  36. }
  37. l.Logger.Infof("send_msg.go batch.Tagids = %v\n", tagMap)
  38. var allContact, allGroup, ok bool
  39. var contactTags, groupTags []uint64
  40. if contactTags, ok = tagMap["contact_tag"]; ok {
  41. allContact = hasAll(contactTags, 0)
  42. l.Logger.Infof("contactTags=%v", contactTags)
  43. }
  44. if groupTags, ok = tagMap["group_tag"]; ok {
  45. allGroup = hasAll(groupTags, 0)
  46. l.Logger.Infof("groupTags=%v", groupTags)
  47. }
  48. var err error
  49. if allContact && allGroup {
  50. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  51. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
  52. if err != nil {
  53. l.Logger.Errorf("userlist err: %v", err)
  54. continue
  55. }
  56. } else {
  57. if allContact { // 所有联系人
  58. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
  59. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeEQ(1)).All(l.ctx)
  60. if err != nil {
  61. l.Logger.Errorf("userList err: %v", err)
  62. continue
  63. }
  64. } else { //获取指定标签的联系人
  65. userList, err = getContactList(l, contactTags, batch.Fromwxid, 1)
  66. if err != nil {
  67. l.Logger.Errorf("userList err: %v", err)
  68. continue
  69. }
  70. }
  71. if allGroup { //所有群
  72. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为2的数据
  73. groupList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeEQ(2)).All(l.ctx)
  74. if err != nil {
  75. l.Logger.Errorf("groupList err: %v", err)
  76. continue
  77. }
  78. } else { //获取指定标签的群
  79. groupList, err = getContactList(l, groupTags, batch.Fromwxid, 2)
  80. if err != nil {
  81. l.Logger.Errorf("groupList err: %v", err)
  82. continue
  83. }
  84. }
  85. if len(groupList) > 0 {
  86. userList = append(userList, groupList...)
  87. }
  88. }
  89. // 这里是待插入到 msg 表的数据
  90. msgs := make([]*ent.MsgCreate, 0)
  91. // 这里是把 batch.Msg 转换为 json 数组
  92. msgArray := make([]custom_types.Action, 0)
  93. err = json.Unmarshal([]byte(batch.Msg), &msgArray)
  94. l.Logger.Infof("msgArray length= %v, err:%v", len(msgArray), err)
  95. if err != nil {
  96. // json 解析失败
  97. msgArray = make([]custom_types.Action, 0)
  98. }
  99. for _, user := range userList {
  100. // 这里改动主要是 batch_msg 目前支持批量添加图文,导致 batch_msg 的 msg 字段为 json
  101. // msg 里包括文字和图片,msgtype=1 为文字, msgtype=2 为图片
  102. // 每一条文字或者图片 都是一条单独的消息
  103. if len(msgArray) > 0 {
  104. // 这里是新格式(msg内容为json),需要遍历数组
  105. for _, msgItem := range msgArray {
  106. msgRow := l.svcCtx.DB.Msg.Create().
  107. SetNotNilFromwxid(&batch.Fromwxid).
  108. SetNotNilToid(&user.Wxid).
  109. SetMsgtype(int32(msgItem.Type)).
  110. SetNotNilMsg(&msgItem.Content).
  111. SetStatus(0).
  112. SetNotNilBatchNo(&batch.BatchNo)
  113. msgs = append(msgs, msgRow)
  114. }
  115. }
  116. }
  117. if len(msgs) > 0 {
  118. // 加事务,批量操作一条 batch_msg 和 一堆 msg 信息
  119. tx, err := l.svcCtx.DB.Tx(l.ctx)
  120. if err != nil {
  121. l.Logger.Errorf("start db transaction err: %v", err)
  122. continue
  123. }
  124. _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
  125. if err != nil {
  126. _ = tx.Rollback()
  127. l.Logger.Errorf("batchmsg update err: %v", err)
  128. continue
  129. }
  130. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  131. if err != nil {
  132. _ = tx.Rollback()
  133. l.Logger.Errorf("msg CreateBulk err: %v", err)
  134. continue
  135. }
  136. _ = tx.Commit()
  137. } else {
  138. // 如果没有消息,直接更新批次状态为已发送
  139. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  140. SetStatus(2).
  141. SetTotal(0).
  142. SetSuccess(0).
  143. SetFail(0).
  144. Save(l.ctx)
  145. if err != nil {
  146. l.Logger.Errorf("batchmsg update err: %v", err)
  147. }
  148. continue
  149. }
  150. }
  151. // 获取当前批次的所有待发送消息
  152. msglist, err := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0)).All(l.ctx)
  153. if err != nil {
  154. l.Logger.Errorf("msglist err: %v", err)
  155. continue
  156. }
  157. wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
  158. if err != nil {
  159. l.Logger.Errorf("wxInfo err: %v", err)
  160. continue
  161. }
  162. privateIP := ""
  163. adminPort := ""
  164. port := ""
  165. if wxInfo.ServerID != 0 {
  166. serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
  167. if err != nil {
  168. l.Logger.Errorf("serverInfo err: %v", err)
  169. continue
  170. }
  171. privateIP = serverInfo.PrivateIP
  172. adminPort = serverInfo.AdminPort
  173. port = wxInfo.Port
  174. }
  175. hookClient := hook.NewHook(privateIP, adminPort, port)
  176. //循环发送消息
  177. for _, msg := range msglist {
  178. // 这里之前只有文字消息(既 msgtype=1) 目前增加了图片 所以增加了msgtype=2
  179. // 所以增加了一个判断,判断发送的内容类型,如果是文字就调用SendTextMsg,如果是图片就调用SendPicMsg
  180. if msg.Msgtype == 1 {
  181. err = hookClient.SendTextMsg(msg.Toid, msg.Msg, wxInfo.Wxid)
  182. } else if msg.Msgtype == 2 {
  183. diyfilename := getFileName(msg.Msg)
  184. err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename, wxInfo.Wxid)
  185. }
  186. // 每次发完暂停1秒
  187. time.Sleep(time.Second)
  188. if err != nil {
  189. l.Logger.Errorf("send msg err: %v", err)
  190. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
  191. if err != nil {
  192. l.Logger.Errorf("msg update err: %v", err)
  193. continue
  194. }
  195. continue
  196. }
  197. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
  198. if err != nil {
  199. l.Logger.Errorf("msg update err: %v", err)
  200. continue
  201. }
  202. }
  203. // 获取当前批次的所有发送的消息总数
  204. total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
  205. // 获取当前批次的所有发送成功的消息总数
  206. success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
  207. // 获取当前批次的所有发送失败的消息总数
  208. fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
  209. // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
  210. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  211. SetStatus(2).
  212. SetTotal(int32(total)).
  213. SetSuccess(int32(success)).
  214. SetFail(int32(fail)).
  215. SetStopTime(time.Now()).
  216. Save(l.ctx)
  217. if err != nil {
  218. l.Logger.Errorf("batchmsg update err: %v", err)
  219. continue
  220. }
  221. l.Logger.Info("batch stop: ", batch.BatchNo)
  222. }
  223. finishTime := time.Now()
  224. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  225. return
  226. }
  227. // 根据URL获取图片名
  228. func getFileName(photoUrl string) string {
  229. u, err := url.Parse(photoUrl)
  230. if err != nil {
  231. return ""
  232. }
  233. return path.Base(u.Path)
  234. }
  235. func hasAll(array []uint64, target uint64) bool {
  236. for _, val := range array {
  237. if val == 0 {
  238. return true
  239. }
  240. }
  241. return false
  242. }
  243. func getContactList(l *CronTask, labels []uint64, fromWxId string, stype int) ([]*ent.Contact, error) {
  244. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  245. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
  246. if err != nil {
  247. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  248. }
  249. contact_ids := make([]uint64, 0, len(labelrelationships))
  250. for _, labelrelationship := range labelrelationships {
  251. contact_ids = append(contact_ids, labelrelationship.ContactID)
  252. }
  253. userList := make([]*ent.Contact, 0)
  254. if len(contact_ids) > 0 {
  255. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  256. userList, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(fromWxId), contact.IDIn(contact_ids...), contact.TypeEQ(stype)).All(l.ctx)
  257. if err != nil {
  258. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  259. }
  260. }
  261. return userList, nil
  262. }