send_wx_on_timeout.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. package crontask
  2. import (
  3. "context"
  4. "regexp"
  5. "time"
  6. "wechat-api/ent"
  7. "wechat-api/ent/custom_types"
  8. "wechat-api/ent/labelrelationship"
  9. "wechat-api/ent/messagerecords"
  10. "wechat-api/ent/sopnode"
  11. "wechat-api/ent/sopstage"
  12. "wechat-api/ent/soptask"
  13. "wechat-api/internal/utils/dberrorhandler"
  14. )
  15. func (l *CronTask) sendWxOnTimeout() {
  16. ctx := context.Background()
  17. startTime := time.Now()
  18. sopTask, _ := l.svcCtx.DB.SopTask.Query().
  19. Where(
  20. soptask.StatusEQ(3),
  21. soptask.DeletedAtIsNil(),
  22. ).
  23. All(l.ctx)
  24. sopTasks := make([]uint64, 0)
  25. for _, st := range sopTask {
  26. sopTasks = append(sopTasks, st.ID)
  27. }
  28. sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx)
  29. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  30. nodes, err := l.svcCtx.DB.SopNode.Query().
  31. Where(sopnode.NoReplyConditionNEQ(0)).
  32. Where(sopnode.HasSopStageWith(
  33. sopstage.StatusEQ(1),
  34. sopstage.DeletedAtIsNil(),
  35. sopstage.HasSopTaskWith(
  36. soptask.StatusEQ(3),
  37. soptask.DeletedAtIsNil(),
  38. ),
  39. )).
  40. All(ctx)
  41. if err != nil {
  42. l.Logger.Errorf("get node list failed %v", err)
  43. return
  44. }
  45. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  46. //parentNodes := make([]uint64, 0)
  47. //stages := make([]uint64, 0)
  48. messages := make([]*ent.MessageRecords, 0)
  49. for _, node := range nodes {
  50. var coef uint64 = 1
  51. switch node.NoReplyUnit {
  52. case "W":
  53. coef = 60 * 24 * 7
  54. case "D":
  55. coef = 60 * 24
  56. case "h":
  57. coef = 60
  58. }
  59. // 查询 node 对应的 stage 记录
  60. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  61. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  62. if node.ParentID == 0 {
  63. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  64. Where(messagerecords.StatusEQ(3)).
  65. Where(messagerecords.SourceTypeEQ(3)).
  66. Where(messagerecords.SourceIDEQ(node.StageID)).
  67. Where(messagerecords.SubSourceIDEQ(0)).
  68. Where(messagerecords.SendTimeGTE(lowerBound)).
  69. Where(messagerecords.SendTimeLTE(upperBound)).
  70. All(ctx)
  71. } else {
  72. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  73. Where(messagerecords.StatusEQ(3)).
  74. Where(messagerecords.SourceTypeEQ(4)).
  75. Where(messagerecords.SourceIDIn(node.ParentID)).
  76. Where(messagerecords.SubSourceIDEQ(0)).
  77. Where(messagerecords.SendTimeGTE(lowerBound)).
  78. Where(messagerecords.SendTimeLTE(upperBound)).
  79. All(ctx)
  80. }
  81. for _, s := range messages {
  82. // 判断 s.Id 是否是 s.ContactID 的最新记录
  83. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  84. Where(messagerecords.ContactIDEQ(s.ContactID)).
  85. Where(messagerecords.StatusEQ(3)).
  86. Order(ent.Desc(messagerecords.FieldCreatedAt)).
  87. First(ctx)
  88. if latest.ID == s.ID {
  89. // 创建 MessageRecords 记录
  90. if node.ActionMessage != nil {
  91. for i, c := range node.ActionMessage {
  92. meta := custom_types.Meta{}
  93. if c.Meta != nil {
  94. meta.Filename = c.Meta.Filename
  95. }
  96. _, _ = l.svcCtx.DB.MessageRecords.Create().
  97. SetStatus(1).
  98. SetBotWxid(s.BotWxid).
  99. SetContactID(s.ContactID).
  100. SetContactType(s.ContactType).
  101. SetContactWxid(s.ContactWxid).
  102. SetContentType(c.Type).
  103. SetContent(c.Content).
  104. SetMeta(meta).
  105. SetSourceType(4).
  106. SetSourceID(node.ID).
  107. SetSubSourceID(uint64(i)).
  108. SetOrganizationID(s.OrganizationID).
  109. Save(ctx)
  110. }
  111. } else {
  112. meta := custom_types.Meta{}
  113. _, _ = l.svcCtx.DB.MessageRecords.Create().
  114. SetStatus(1).
  115. SetBotWxid(s.BotWxid).
  116. SetContactID(s.ContactID).
  117. SetContactType(s.ContactType).
  118. SetContactWxid(s.ContactWxid).
  119. SetContentType(1).
  120. SetMeta(meta).
  121. SetSourceType(4).
  122. SetSourceID(node.ID).
  123. SetSubSourceID(0).
  124. SetOrganizationID(s.OrganizationID).
  125. Save(ctx)
  126. }
  127. if node.ActionForward != nil {
  128. if node.ActionForward.Wxid != "" {
  129. forwardWxids := splitString(node.ActionForward.Wxid)
  130. for _, forwardWxid := range forwardWxids {
  131. for i, message := range node.ActionForward.Action {
  132. meta := custom_types.Meta{}
  133. if message.Meta != nil {
  134. meta.Filename = message.Meta.Filename
  135. }
  136. _, _ = l.svcCtx.DB.MessageRecords.Create().
  137. SetBotWxid(s.BotWxid).
  138. SetContactID(0).
  139. SetContactType(0).
  140. SetContactWxid(forwardWxid).
  141. SetContentType(message.Type).
  142. SetContent(message.Content).
  143. SetMeta(meta).
  144. SetSourceType(4).
  145. SetSourceID(node.ID).
  146. SetSubSourceID(s.ContactID + uint64(i)).
  147. SetOrganizationID(s.OrganizationID).
  148. Save(l.ctx)
  149. }
  150. }
  151. }
  152. }
  153. // 查询当前联系人的标签关系
  154. currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
  155. if currentLabelRelationships == nil {
  156. continue
  157. }
  158. // 提取当前标签ID
  159. var currentLabelIds []uint64
  160. for _, relationship := range currentLabelRelationships {
  161. currentLabelIds = append(currentLabelIds, relationship.LabelID)
  162. }
  163. contact := &ent.Contact{
  164. ID: s.ContactID,
  165. Type: s.ContentType,
  166. WxWxid: s.BotWxid,
  167. Wxid: s.ContactWxid,
  168. }
  169. if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
  170. _ = l.AddLabelRelationships(sopStages, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID)
  171. }
  172. }
  173. }
  174. }
  175. finishTime := time.Now()
  176. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  177. return
  178. }
  179. func splitString(input string) []string {
  180. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  181. pattern := `[,,、]`
  182. re := regexp.MustCompile(pattern)
  183. // Split the input string based on the pattern
  184. return re.Split(input, -1)
  185. }
  186. func (l *CronTask) AddLabelRelationships(sopStages []*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64) (err error) {
  187. //// 开始事务
  188. //tx, err := l.svcCtx.DB.Tx(context.Background())
  189. //if err != nil {
  190. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  191. //}
  192. // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
  193. var newLabelIds []uint64
  194. var remLabelIds []uint64
  195. var finalLabelIds []uint64
  196. // 创建一个映射,用于快速查找 currentLabelIds 中的元素
  197. currentLabelIdSet := make(map[uint64]struct{})
  198. for _, id := range currentLabelIds {
  199. currentLabelIdSet[id] = struct{}{}
  200. }
  201. delLabelIdSet := make(map[uint64]struct{})
  202. for _, id := range delLabelIds {
  203. delLabelIdSet[id] = struct{}{}
  204. }
  205. if addLabelIds != nil {
  206. // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
  207. for _, id := range addLabelIds {
  208. if _, ce := currentLabelIdSet[id]; !ce {
  209. if _, re := delLabelIdSet[id]; !re {
  210. newLabelIds = append(newLabelIds, id)
  211. }
  212. }
  213. }
  214. if len(newLabelIds) > 0 {
  215. // 创建需要新增的标签关系
  216. for _, id := range newLabelIds {
  217. currentLabelIdSet[id] = struct{}{}
  218. _, err = l.svcCtx.DB.LabelRelationship.Create().
  219. SetLabelID(id).
  220. SetContactID(contact.ID).
  221. SetOrganizationID(organizationId).
  222. Save(l.ctx)
  223. if err != nil {
  224. //_ = tx.Rollback()
  225. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  226. }
  227. }
  228. // 合并 currentLabelIds 和 newLabelIds
  229. currentLabelIds = append(currentLabelIds, newLabelIds...)
  230. }
  231. }
  232. if delLabelIds != nil {
  233. // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
  234. for _, id := range delLabelIds {
  235. if _, exists := currentLabelIdSet[id]; exists {
  236. remLabelIds = append(newLabelIds, id)
  237. delete(currentLabelIdSet, id)
  238. }
  239. }
  240. if len(remLabelIds) > 0 {
  241. _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
  242. if err != nil {
  243. //_ = tx.Rollback()
  244. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  245. }
  246. }
  247. }
  248. if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
  249. return nil
  250. }
  251. for id := range currentLabelIdSet {
  252. finalLabelIds = append(finalLabelIds, id)
  253. }
  254. // 遍历 sop_stages,找出满足条件的 stage
  255. for _, stage := range sopStages {
  256. if stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
  257. // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
  258. _, err := l.svcCtx.DB.MessageRecords.Query().
  259. Where(
  260. messagerecords.ContactWxid(contact.Wxid),
  261. messagerecords.SourceType(3),
  262. messagerecords.SourceID(stage.ID),
  263. messagerecords.SubSourceID(0),
  264. ).
  265. Only(l.ctx)
  266. if !ent.IsNotFound(err) {
  267. continue
  268. }
  269. // 判断ActionMessage是否为空
  270. sourceType := 3
  271. if stage.ActionMessage != nil {
  272. for i, message := range stage.ActionMessage {
  273. meta := custom_types.Meta{}
  274. if message.Meta != nil {
  275. meta.Filename = message.Meta.Filename
  276. }
  277. _, _ = l.svcCtx.DB.MessageRecords.Create().
  278. SetNotNilBotWxid(&contact.WxWxid).
  279. SetNotNilContactID(&contact.ID).
  280. SetNotNilContactType(&contact.Type).
  281. SetNotNilContactWxid(&contact.Wxid).
  282. SetNotNilContentType(&message.Type).
  283. SetNotNilContent(&message.Content).
  284. SetMeta(meta).
  285. SetNotNilSourceType(&sourceType).
  286. SetNotNilSourceID(&stage.ID).
  287. SetSubSourceID(uint64(i)).
  288. SetOrganizationID(organizationId).
  289. Save(l.ctx)
  290. //if err != nil {
  291. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  292. //}
  293. }
  294. }
  295. if stage.ActionForward != nil {
  296. if stage.ActionForward.Wxid != "" {
  297. forwardWxids := splitString(stage.ActionForward.Wxid)
  298. for _, forwardWxid := range forwardWxids {
  299. for i, message := range stage.ActionForward.Action {
  300. meta := custom_types.Meta{}
  301. if message.Meta != nil {
  302. meta.Filename = message.Meta.Filename
  303. }
  304. _, err = l.svcCtx.DB.MessageRecords.Create().
  305. SetBotWxid(contact.WxWxid).
  306. SetContactID(0).
  307. SetContactType(0).
  308. SetContactWxid(forwardWxid).
  309. SetContentType(message.Type).
  310. SetContent(message.Content).
  311. SetMeta(meta).
  312. SetSourceType(sourceType).
  313. SetSourceID(stage.ID).
  314. SetSubSourceID(contact.ID + uint64(i)).
  315. SetOrganizationID(organizationId).
  316. Save(l.ctx)
  317. }
  318. }
  319. }
  320. }
  321. if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
  322. // 递归调用 AddLabelRelationships
  323. err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId)
  324. if err != nil {
  325. //_ = tx.Rollback()
  326. return err
  327. }
  328. }
  329. }
  330. }
  331. // 所有操作成功,提交事务
  332. //err = tx.Commit()
  333. //if err != nil {
  334. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  335. //}
  336. return nil
  337. }
  338. func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
  339. labelIdSet := make(map[uint64]struct{})
  340. for _, id := range labelIdList {
  341. labelIdSet[id] = struct{}{}
  342. }
  343. for _, condition := range conditionList {
  344. match := false
  345. for _, id := range condition.LabelIdList {
  346. if _, ok := labelIdSet[id]; ok {
  347. match = true
  348. break
  349. }
  350. }
  351. if condition.Equal == 2 {
  352. match = !match
  353. }
  354. if conditionOperator == 1 && !match {
  355. return false
  356. } else if conditionOperator == 2 && match {
  357. return true
  358. }
  359. }
  360. return conditionOperator == 1
  361. }