send_wx_on_timeout.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package crontask
  2. import (
  3. "context"
  4. "time"
  5. "wechat-api/ent"
  6. "wechat-api/ent/custom_types"
  7. "wechat-api/ent/messagerecords"
  8. "wechat-api/ent/sopnode"
  9. "wechat-api/ent/sopstage"
  10. "wechat-api/ent/soptask"
  11. )
  12. func (l *CronTask) sendWxOnTimeout() {
  13. ctx := context.Background()
  14. startTime := time.Now()
  15. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  16. nodes, err := l.svcCtx.DB.SopNode.Query().
  17. Where(sopnode.NoReplyConditionNEQ(0)).
  18. Where(sopnode.HasSopStageWith(
  19. sopstage.StatusEQ(1),
  20. sopstage.DeletedAtIsNil(),
  21. sopstage.HasSopTaskWith(
  22. soptask.StatusEQ(3),
  23. soptask.DeletedAtIsNil(),
  24. ),
  25. )).
  26. All(ctx)
  27. if err != nil {
  28. l.Logger.Errorf("get node list failed %v", err)
  29. return
  30. }
  31. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  32. //parentNodes := make([]uint64, 0)
  33. //stages := make([]uint64, 0)
  34. messages := make([]*ent.MessageRecords, 0)
  35. for _, node := range nodes {
  36. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition+2))
  37. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition))
  38. if node.ParentID == 0 {
  39. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  40. Where(messagerecords.StatusEQ(3)).
  41. Where(messagerecords.SourceTypeEQ(3)).
  42. Where(messagerecords.SourceIDEQ(node.StageID)).
  43. Where(messagerecords.SubSourceIDEQ(0)).
  44. Where(messagerecords.SendTimeGTE(lowerBound)).
  45. Where(messagerecords.SendTimeLTE(upperBound)).
  46. All(ctx)
  47. } else {
  48. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  49. Where(messagerecords.StatusEQ(3)).
  50. Where(messagerecords.SourceTypeEQ(4)).
  51. Where(messagerecords.SourceIDIn(node.ParentID)).
  52. Where(messagerecords.SubSourceIDEQ(0)).
  53. Where(messagerecords.SendTimeGTE(lowerBound)).
  54. Where(messagerecords.SendTimeLTE(upperBound)).
  55. All(ctx)
  56. }
  57. for _, s := range messages {
  58. // 判断 s.Id 是否是 s.ContactID 的最新记录
  59. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  60. Where(messagerecords.ContactIDEQ(s.ContactID)).
  61. Where(messagerecords.StatusEQ(3)).
  62. Order(ent.Desc(messagerecords.FieldCreatedAt)).
  63. First(ctx)
  64. if latest.ID == s.ID {
  65. // 创建 MessageRecords 记录
  66. for i, c := range node.ActionMessage {
  67. meta := custom_types.Meta{}
  68. if c.Meta != nil {
  69. meta.Filename = c.Meta.Filename
  70. }
  71. _, _ = l.svcCtx.DB.MessageRecords.Create().
  72. SetStatus(1).
  73. SetBotWxid(s.BotWxid).
  74. SetContactID(s.ContactID).
  75. SetContactType(s.ContactType).
  76. SetContactWxid(s.ContactWxid).
  77. SetContentType(c.Type).
  78. SetContent(c.Content).
  79. SetMeta(meta).
  80. SetSourceType(4).
  81. SetSourceID(node.ID).
  82. SetSubSourceID(uint64(i)).
  83. Save(ctx)
  84. }
  85. }
  86. }
  87. }
  88. finishTime := time.Now()
  89. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  90. return
  91. }