send_wx_on_timeout.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package crontask
  2. import (
  3. "context"
  4. "regexp"
  5. "time"
  6. "wechat-api/ent"
  7. "wechat-api/ent/custom_types"
  8. "wechat-api/ent/messagerecords"
  9. "wechat-api/ent/sopnode"
  10. "wechat-api/ent/sopstage"
  11. "wechat-api/ent/soptask"
  12. )
  13. func (l *CronTask) sendWxOnTimeout() {
  14. ctx := context.Background()
  15. startTime := time.Now()
  16. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  17. nodes, err := l.svcCtx.DB.SopNode.Query().
  18. Where(sopnode.NoReplyConditionNEQ(0)).
  19. Where(sopnode.HasSopStageWith(
  20. sopstage.StatusEQ(1),
  21. sopstage.DeletedAtIsNil(),
  22. sopstage.HasSopTaskWith(
  23. soptask.StatusEQ(3),
  24. soptask.DeletedAtIsNil(),
  25. ),
  26. )).
  27. All(ctx)
  28. if err != nil {
  29. l.Logger.Errorf("get node list failed %v", err)
  30. return
  31. }
  32. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  33. //parentNodes := make([]uint64, 0)
  34. //stages := make([]uint64, 0)
  35. messages := make([]*ent.MessageRecords, 0)
  36. for _, node := range nodes {
  37. var coef uint64 = 1
  38. switch node.NoReplyUnit {
  39. case "W":
  40. coef = 60 * 24 * 7
  41. case "D":
  42. coef = 60 * 24
  43. case "h":
  44. coef = 60
  45. }
  46. // 查询 node 对应的 stage 记录
  47. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  48. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  49. if node.ParentID == 0 {
  50. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  51. Where(messagerecords.StatusEQ(3)).
  52. Where(messagerecords.SourceTypeEQ(3)).
  53. Where(messagerecords.SourceIDEQ(node.StageID)).
  54. Where(messagerecords.SubSourceIDEQ(0)).
  55. Where(messagerecords.SendTimeGTE(lowerBound)).
  56. Where(messagerecords.SendTimeLTE(upperBound)).
  57. All(ctx)
  58. } else {
  59. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  60. Where(messagerecords.StatusEQ(3)).
  61. Where(messagerecords.SourceTypeEQ(4)).
  62. Where(messagerecords.SourceIDIn(node.ParentID)).
  63. Where(messagerecords.SubSourceIDEQ(0)).
  64. Where(messagerecords.SendTimeGTE(lowerBound)).
  65. Where(messagerecords.SendTimeLTE(upperBound)).
  66. All(ctx)
  67. }
  68. for _, s := range messages {
  69. // 判断 s.Id 是否是 s.ContactID 的最新记录
  70. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  71. Where(messagerecords.ContactIDEQ(s.ContactID)).
  72. Where(messagerecords.StatusEQ(3)).
  73. Order(ent.Desc(messagerecords.FieldCreatedAt)).
  74. First(ctx)
  75. if latest.ID == s.ID {
  76. // 创建 MessageRecords 记录
  77. if node.ActionMessage != nil {
  78. for i, c := range node.ActionMessage {
  79. meta := custom_types.Meta{}
  80. if c.Meta != nil {
  81. meta.Filename = c.Meta.Filename
  82. }
  83. _, _ = l.svcCtx.DB.MessageRecords.Create().
  84. SetStatus(1).
  85. SetBotWxid(s.BotWxid).
  86. SetContactID(s.ContactID).
  87. SetContactType(s.ContactType).
  88. SetContactWxid(s.ContactWxid).
  89. SetContentType(c.Type).
  90. SetContent(c.Content).
  91. SetMeta(meta).
  92. SetSourceType(4).
  93. SetSourceID(node.ID).
  94. SetSubSourceID(uint64(i)).
  95. SetOrganizationID(s.OrganizationID).
  96. Save(ctx)
  97. }
  98. } else {
  99. meta := custom_types.Meta{}
  100. _, _ = l.svcCtx.DB.MessageRecords.Create().
  101. SetStatus(1).
  102. SetBotWxid(s.BotWxid).
  103. SetContactID(s.ContactID).
  104. SetContactType(s.ContactType).
  105. SetContactWxid(s.ContactWxid).
  106. SetContentType(1).
  107. SetMeta(meta).
  108. SetSourceType(4).
  109. SetSourceID(node.ID).
  110. SetSubSourceID(0).
  111. SetOrganizationID(s.OrganizationID).
  112. Save(ctx)
  113. }
  114. if node.ActionForward != nil {
  115. if node.ActionForward.Wxid != "" {
  116. forwardWxids := splitString(node.ActionForward.Wxid)
  117. for _, forwardWxid := range forwardWxids {
  118. for i, message := range node.ActionForward.Action {
  119. meta := custom_types.Meta{}
  120. if message.Meta != nil {
  121. meta.Filename = message.Meta.Filename
  122. }
  123. _, _ = l.svcCtx.DB.MessageRecords.Create().
  124. SetBotWxid(s.BotWxid).
  125. SetContactID(0).
  126. SetContactType(0).
  127. SetContactWxid(forwardWxid).
  128. SetContentType(message.Type).
  129. SetContent(message.Content).
  130. SetMeta(meta).
  131. SetSourceType(4).
  132. SetSourceID(node.ID).
  133. SetSubSourceID(s.ContactID + uint64(i)).
  134. SetOrganizationID(s.OrganizationID).
  135. Save(l.ctx)
  136. }
  137. }
  138. }
  139. }
  140. }
  141. }
  142. }
  143. finishTime := time.Now()
  144. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  145. return
  146. }
  147. func splitString(input string) []string {
  148. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  149. pattern := `[,,、]`
  150. re := regexp.MustCompile(pattern)
  151. // Split the input string based on the pattern
  152. return re.Split(input, -1)
  153. }