send_msg.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package crontask
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/url"
  6. "path"
  7. "time"
  8. "wechat-api/ent"
  9. "wechat-api/ent/batchmsg"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/custom_types"
  12. "wechat-api/ent/labelrelationship"
  13. "wechat-api/ent/msg"
  14. "wechat-api/ent/wx"
  15. "wechat-api/hook"
  16. "wechat-api/internal/utils/dberrorhandler"
  17. )
  18. func (l *CronTask) sendMsg() {
  19. // 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
  20. batchList, err := l.svcCtx.DB.BatchMsg.Query().Where(
  21. batchmsg.StartTimeLT(time.Now()),
  22. batchmsg.StatusIn(0, 1),
  23. batchmsg.CtypeIn(1, 3),
  24. ).All(l.ctx)
  25. l.Logger.Infof("send_msg.go BatchList %v\n", batchList)
  26. if err != nil {
  27. l.Logger.Errorf("batchList err: %v\n", err)
  28. return
  29. }
  30. startTime := time.Now()
  31. for _, batch := range batchList {
  32. // 记录当前批次开始处理
  33. l.Logger.Infof("batch start: %s\n", batch.BatchNo)
  34. // 如果 批次 status 为 0,则先产生待发送消息
  35. if batch.Status == 0 {
  36. userList := make([]*ent.Contact, 0)
  37. groupList := make([]*ent.Contact, 0)
  38. tagMap := make(map[string][]uint64)
  39. err = json.Unmarshal([]byte(batch.Tagids), &tagMap)
  40. if err != nil {
  41. continue
  42. }
  43. l.Logger.Infof("send_msg.go batch.Tagids = %v\n", tagMap)
  44. var allContact, allGroup, ok bool
  45. var contactTags, groupTags []uint64
  46. if contactTags, ok = tagMap["contact_tag"]; ok {
  47. allContact = hasAll(contactTags, 0)
  48. l.Logger.Infof("contactTags=%v \n", contactTags)
  49. }
  50. if groupTags, ok = tagMap["group_tag"]; ok {
  51. allGroup = hasAll(groupTags, 0)
  52. l.Logger.Infof("groupTags=%v \n", groupTags)
  53. }
  54. var err error
  55. if allContact && allGroup {
  56. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  57. userList, err = l.svcCtx.DB.Contact.Query().Where(
  58. contact.WxWxid(batch.Fromwxid),
  59. contact.TypeIn(1, 2),
  60. contact.CtypeIn(1, 3),
  61. ).All(l.ctx)
  62. if err != nil {
  63. l.Logger.Errorf("userlist err: %v \n", err)
  64. continue
  65. }
  66. } else {
  67. if allContact { // 所有联系人
  68. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
  69. userList, err = l.svcCtx.DB.Contact.Query().Where(
  70. contact.WxWxid(batch.Fromwxid),
  71. contact.TypeEQ(1),
  72. contact.CtypeIn(1, 3),
  73. ).All(l.ctx)
  74. if err != nil {
  75. l.Logger.Errorf("userList err: %v \n", err)
  76. continue
  77. }
  78. } else { //获取指定标签的联系人
  79. userList, err = l.getContactList(contactTags, batch.Fromwxid, 1)
  80. if err != nil {
  81. l.Logger.Errorf("userList err: %v \n", err)
  82. continue
  83. }
  84. }
  85. if allGroup { //所有群
  86. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为2的数据
  87. groupList, err = l.svcCtx.DB.Contact.Query().Where(
  88. contact.WxWxid(batch.Fromwxid),
  89. contact.TypeEQ(2),
  90. contact.CtypeIn(1, 3),
  91. ).All(l.ctx)
  92. if err != nil {
  93. l.Logger.Errorf("groupList err: %v \n", err)
  94. continue
  95. }
  96. } else { //获取指定标签的群
  97. groupList, err = l.getContactList(groupTags, batch.Fromwxid, 2)
  98. if err != nil {
  99. l.Logger.Errorf("groupList err: %v \n", err)
  100. continue
  101. }
  102. }
  103. if len(groupList) > 0 {
  104. userList = append(userList, groupList...)
  105. }
  106. }
  107. // 这里是待插入到 msg 表的数据
  108. msgs := make([]*ent.MsgCreate, 0)
  109. // 这里是把 batch.Msg 转换为 json 数组
  110. msgArray := make([]custom_types.Action, 0)
  111. err = json.Unmarshal([]byte(batch.Msg), &msgArray)
  112. l.Logger.Infof("msgArray length= %v, err:%v \n", len(msgArray), err)
  113. if err != nil {
  114. // json 解析失败
  115. msgArray = make([]custom_types.Action, 0)
  116. }
  117. for _, user := range userList {
  118. // 这里改动主要是 batch_msg 目前支持批量添加图文,导致 batch_msg 的 msg 字段为 json
  119. // msg 里包括文字和图片,msgtype=1 为文字, msgtype=2 为图片
  120. // 每一条文字或者图片 都是一条单独的消息
  121. if len(msgArray) > 0 {
  122. // 这里是新格式(msg内容为json),需要遍历数组
  123. for _, msgItem := range msgArray {
  124. msgRow := l.svcCtx.DB.Msg.Create().
  125. SetNotNilFromwxid(&batch.Fromwxid).
  126. SetNotNilToid(&user.Wxid).
  127. SetMsgtype(int32(msgItem.Type)).
  128. SetNotNilMsg(&msgItem.Content).
  129. SetStatus(0).
  130. SetNotNilBatchNo(&batch.BatchNo)
  131. msgs = append(msgs, msgRow)
  132. }
  133. }
  134. }
  135. if len(msgs) > 0 {
  136. // 加事务,批量操作一条 batch_msg 和 一堆 msg 信息
  137. tx, err := l.svcCtx.DB.Tx(l.ctx)
  138. if err != nil {
  139. l.Logger.Errorf("start db transaction err: %v \n", err)
  140. continue
  141. }
  142. _, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
  143. if err != nil {
  144. _ = tx.Rollback()
  145. l.Logger.Errorf("batchmsg update err: %v \n", err)
  146. continue
  147. }
  148. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  149. if err != nil {
  150. _ = tx.Rollback()
  151. l.Logger.Errorf("msg CreateBulk err: %v \n", err)
  152. continue
  153. }
  154. _ = tx.Commit()
  155. } else {
  156. // 如果没有消息,直接更新批次状态为已发送
  157. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  158. SetStatus(2).
  159. SetTotal(0).
  160. SetSuccess(0).
  161. SetFail(0).
  162. Save(l.ctx)
  163. if err != nil {
  164. l.Logger.Errorf("batchmsg update err: %v \n", err)
  165. }
  166. continue
  167. }
  168. }
  169. // 获取当前批次的所有待发送消息
  170. msglist, err := l.svcCtx.DB.Msg.Query().Where(
  171. msg.BatchNoEQ(batch.BatchNo),
  172. msg.StatusEQ(0),
  173. ).All(l.ctx)
  174. if err != nil {
  175. l.Logger.Errorf("msglist err: %v \n", err)
  176. continue
  177. }
  178. wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
  179. if err != nil {
  180. l.Logger.Errorf("wxInfo err: %v \n", err)
  181. continue
  182. }
  183. privateIP := ""
  184. adminPort := ""
  185. port := ""
  186. ctype := batch.Ctype
  187. if wxInfo.ServerID != 0 {
  188. serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
  189. if err != nil {
  190. l.Logger.Errorf("serverInfo err: %v \n", err)
  191. continue
  192. }
  193. privateIP = serverInfo.PrivateIP
  194. adminPort = serverInfo.AdminPort
  195. port = wxInfo.Port
  196. }
  197. var hookClient *hook.Hook
  198. if ctype == 3 {
  199. hookClient = hook.NewWecomHook("", adminPort, port)
  200. } else {
  201. hookClient = hook.NewHook(privateIP, adminPort, port)
  202. }
  203. //循环发送消息
  204. for _, msg := range msglist {
  205. // 这里之前只有文字消息(既 msgtype=1) 目前增加了图片 所以增加了msgtype=2
  206. // 所以增加了一个判断,判断发送的内容类型,如果是文字就调用SendTextMsg,如果是图片就调用SendPicMsg
  207. if msg.Msgtype == 1 {
  208. msgId := time.Now().UnixNano() / int64(time.Microsecond)
  209. l.svcCtx.Rds.Set(l.ctx, fmt.Sprintf("MsgId_FriendId:%d", msgId), msg.Toid, 10*time.Minute)
  210. err = hookClient.SendTextMsg(msg.Toid, msg.Msg, wxInfo.Wxid, msgId)
  211. } else if msg.Msgtype == 2 {
  212. diyfilename := getFileName(msg.Msg)
  213. err = hookClient.SendPicMsg(msg.Toid, msg.Msg, diyfilename, wxInfo.Wxid)
  214. }
  215. // 每次发完暂停1秒
  216. time.Sleep(time.Second)
  217. if err != nil {
  218. l.Logger.Errorf("send msg err: %v \n", err)
  219. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
  220. if err != nil {
  221. l.Logger.Errorf("msg update err: %v \n", err)
  222. continue
  223. }
  224. continue
  225. }
  226. _, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
  227. if err != nil {
  228. l.Logger.Errorf("msg update err: %v \n", err)
  229. continue
  230. }
  231. }
  232. // 获取当前批次的所有发送的消息总数
  233. total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
  234. // 获取当前批次的所有发送成功的消息总数
  235. success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
  236. // 获取当前批次的所有发送失败的消息总数
  237. fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
  238. // 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
  239. _, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
  240. SetStatus(2).
  241. SetTotal(int32(total)).
  242. SetSuccess(int32(success)).
  243. SetFail(int32(fail)).
  244. SetStopTime(time.Now()).
  245. Save(l.ctx)
  246. if err != nil {
  247. l.Logger.Errorf("batchmsg update err: %v \n", err)
  248. continue
  249. }
  250. l.Logger.Infof("batch stop:%s \n", batch.BatchNo)
  251. }
  252. finishTime := time.Now()
  253. l.Logger.Infof("This process cost %v \n", finishTime.Sub(startTime).String())
  254. return
  255. }
  256. // 根据URL获取图片名
  257. func getFileName(photoUrl string) string {
  258. u, err := url.Parse(photoUrl)
  259. if err != nil {
  260. return ""
  261. }
  262. return path.Base(u.Path)
  263. }
  264. func hasAll(array []uint64, target uint64) bool {
  265. for _, val := range array {
  266. if val == target {
  267. return true
  268. }
  269. }
  270. return false
  271. }
  272. func (l *CronTask) getContactList(labels []uint64, fromWxId string, stype int) ([]*ent.Contact, error) {
  273. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  274. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(
  275. labelrelationship.LabelIDIn(labels...),
  276. ).All(l.ctx)
  277. if err != nil {
  278. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  279. }
  280. contact_ids := make([]uint64, 0, len(labelrelationships))
  281. for _, labelrelationship := range labelrelationships {
  282. contact_ids = append(contact_ids, labelrelationship.ContactID)
  283. }
  284. userList := make([]*ent.Contact, 0)
  285. if len(contact_ids) > 0 {
  286. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  287. userList, err = l.svcCtx.DB.Contact.Query().Where(
  288. contact.WxWxid(fromWxId),
  289. contact.IDIn(contact_ids...),
  290. contact.TypeEQ(stype),
  291. contact.CtypeIn(1, 3),
  292. ).All(l.ctx)
  293. if err != nil {
  294. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  295. }
  296. }
  297. return userList, nil
  298. }