send_wx_on_timeout.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package crontask
  2. import (
  3. "context"
  4. "regexp"
  5. "time"
  6. "wechat-api/ent"
  7. "wechat-api/ent/custom_types"
  8. "wechat-api/ent/messagerecords"
  9. "wechat-api/ent/sopnode"
  10. "wechat-api/ent/sopstage"
  11. "wechat-api/ent/soptask"
  12. )
  13. func (l *CronTask) sendWxOnTimeout() {
  14. ctx := context.Background()
  15. startTime := time.Now()
  16. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  17. nodes, err := l.svcCtx.DB.SopNode.Query().
  18. Where(sopnode.NoReplyConditionNEQ(0)).
  19. Where(sopnode.HasSopStageWith(
  20. sopstage.StatusEQ(1),
  21. sopstage.DeletedAtIsNil(),
  22. sopstage.HasSopTaskWith(
  23. soptask.StatusEQ(3),
  24. soptask.DeletedAtIsNil(),
  25. ),
  26. )).
  27. All(ctx)
  28. if err != nil {
  29. l.Logger.Errorf("get node list failed %v", err)
  30. return
  31. }
  32. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  33. //parentNodes := make([]uint64, 0)
  34. //stages := make([]uint64, 0)
  35. messages := make([]*ent.MessageRecords, 0)
  36. for _, node := range nodes {
  37. var coef uint64 = 1
  38. switch node.NoReplyUnit {
  39. case "W":
  40. coef = 60 * 24 * 7
  41. case "D":
  42. coef = 60 * 24
  43. case "h":
  44. coef = 60
  45. }
  46. // 查询 node 对应的 stage 记录
  47. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  48. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  49. l.Logger.Errorf("------------------------lowerBound--------------------------- %+v", lowerBound)
  50. l.Logger.Errorf("------------------------now--------------------------- %+v", time.Now())
  51. if node.ParentID == 0 {
  52. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  53. Where(messagerecords.StatusEQ(3)).
  54. Where(messagerecords.SourceTypeEQ(3)).
  55. Where(messagerecords.SourceIDEQ(node.StageID)).
  56. Where(messagerecords.SubSourceIDEQ(0)).
  57. Where(messagerecords.SendTimeGTE(lowerBound)).
  58. Where(messagerecords.SendTimeLTE(upperBound)).
  59. All(ctx)
  60. } else {
  61. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  62. Where(messagerecords.StatusEQ(3)).
  63. Where(messagerecords.SourceTypeEQ(4)).
  64. Where(messagerecords.SourceIDIn(node.ParentID)).
  65. Where(messagerecords.SubSourceIDEQ(0)).
  66. Where(messagerecords.SendTimeGTE(lowerBound)).
  67. Where(messagerecords.SendTimeLTE(upperBound)).
  68. All(ctx)
  69. }
  70. for _, s := range messages {
  71. // 判断 s.Id 是否是 s.ContactID 的最新记录
  72. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  73. Where(messagerecords.ContactIDEQ(s.ContactID)).
  74. Where(messagerecords.StatusEQ(3)).
  75. Order(ent.Desc(messagerecords.FieldCreatedAt)).
  76. First(ctx)
  77. if latest.ID == s.ID {
  78. // 创建 MessageRecords 记录
  79. if node.ActionMessage != nil {
  80. for i, c := range node.ActionMessage {
  81. meta := custom_types.Meta{}
  82. if c.Meta != nil {
  83. meta.Filename = c.Meta.Filename
  84. }
  85. _, _ = l.svcCtx.DB.MessageRecords.Create().
  86. SetStatus(1).
  87. SetBotWxid(s.BotWxid).
  88. SetContactID(s.ContactID).
  89. SetContactType(s.ContactType).
  90. SetContactWxid(s.ContactWxid).
  91. SetContentType(c.Type).
  92. SetContent(c.Content).
  93. SetMeta(meta).
  94. SetSourceType(4).
  95. SetSourceID(node.ID).
  96. SetSubSourceID(uint64(i)).
  97. SetOrganizationID(s.OrganizationID).
  98. Save(ctx)
  99. }
  100. } else {
  101. meta := custom_types.Meta{}
  102. _, _ = l.svcCtx.DB.MessageRecords.Create().
  103. SetStatus(1).
  104. SetBotWxid(s.BotWxid).
  105. SetContactID(s.ContactID).
  106. SetContactType(s.ContactType).
  107. SetContactWxid(s.ContactWxid).
  108. SetContentType(1).
  109. SetMeta(meta).
  110. SetSourceType(4).
  111. SetSourceID(node.ID).
  112. SetSubSourceID(0).
  113. SetOrganizationID(s.OrganizationID).
  114. Save(ctx)
  115. }
  116. if node.ActionForward != nil {
  117. if node.ActionForward.Wxid != "" {
  118. forwardWxids := splitString(node.ActionForward.Wxid)
  119. for _, forwardWxid := range forwardWxids {
  120. for i, message := range node.ActionForward.Action {
  121. meta := custom_types.Meta{}
  122. if message.Meta != nil {
  123. meta.Filename = message.Meta.Filename
  124. }
  125. _, _ = l.svcCtx.DB.MessageRecords.Create().
  126. SetBotWxid(s.BotWxid).
  127. SetContactID(0).
  128. SetContactType(0).
  129. SetContactWxid(forwardWxid).
  130. SetContentType(message.Type).
  131. SetContent(message.Content).
  132. SetMeta(meta).
  133. SetSourceType(4).
  134. SetSourceID(node.ID).
  135. SetSubSourceID(s.ContactID + uint64(i)).
  136. SetOrganizationID(s.OrganizationID).
  137. Save(l.ctx)
  138. }
  139. }
  140. }
  141. }
  142. }
  143. }
  144. }
  145. finishTime := time.Now()
  146. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  147. return
  148. }
  149. func splitString(input string) []string {
  150. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  151. pattern := `[,,、]`
  152. re := regexp.MustCompile(pattern)
  153. // Split the input string based on the pattern
  154. return re.Split(input, -1)
  155. }