send_wx_on_timeout.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package crontask
  2. import (
  3. "context"
  4. "regexp"
  5. "time"
  6. "wechat-api/ent"
  7. "wechat-api/ent/custom_types"
  8. "wechat-api/ent/labelrelationship"
  9. "wechat-api/ent/messagerecords"
  10. "wechat-api/ent/sopnode"
  11. "wechat-api/ent/sopstage"
  12. "wechat-api/ent/soptask"
  13. "wechat-api/internal/utils/dberrorhandler"
  14. )
  15. func (l *CronTask) sendWxOnTimeout() {
  16. ctx := context.Background()
  17. startTime := time.Now()
  18. sopTask, _ := l.svcCtx.DB.SopTask.Query().
  19. Where(
  20. soptask.StatusEQ(3),
  21. soptask.DeletedAtIsNil(),
  22. ).
  23. All(l.ctx)
  24. sopTasks := make([]uint64, 0)
  25. for _, st := range sopTask {
  26. sopTasks = append(sopTasks, st.ID)
  27. }
  28. sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx)
  29. stageMap := make(map[uint64]*ent.SopStage)
  30. for _, stage := range sopStages {
  31. stageMap[stage.ID] = stage
  32. }
  33. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  34. nodes, err := l.svcCtx.DB.SopNode.Query().
  35. Where(sopnode.NoReplyConditionNEQ(0)).
  36. Where(sopnode.HasSopStageWith(
  37. sopstage.StatusEQ(1),
  38. sopstage.DeletedAtIsNil(),
  39. sopstage.HasSopTaskWith(
  40. soptask.StatusEQ(3),
  41. soptask.DeletedAtIsNil(),
  42. ),
  43. )).
  44. All(ctx)
  45. if err != nil {
  46. l.Logger.Errorf("get node list failed %v", err)
  47. return
  48. }
  49. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  50. //parentNodes := make([]uint64, 0)
  51. //stages := make([]uint64, 0)
  52. messages := make([]*ent.MessageRecords, 0)
  53. for _, node := range nodes {
  54. var coef uint64 = 1
  55. switch node.NoReplyUnit {
  56. case "W":
  57. coef = 60 * 24 * 7
  58. case "D":
  59. coef = 60 * 24
  60. case "h":
  61. coef = 60
  62. }
  63. // 查询 node 对应的 stage 记录
  64. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  65. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  66. if node.ParentID == 0 {
  67. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  68. Where(messagerecords.StatusEQ(3)).
  69. Where(messagerecords.SourceTypeEQ(3)).
  70. Where(messagerecords.SourceIDEQ(node.StageID)).
  71. Where(messagerecords.SubSourceIDEQ(0)).
  72. Where(messagerecords.SendTimeGTE(lowerBound)).
  73. Where(messagerecords.SendTimeLTE(upperBound)).
  74. All(ctx)
  75. } else {
  76. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  77. Where(messagerecords.StatusEQ(3)).
  78. Where(messagerecords.SourceTypeEQ(4)).
  79. Where(messagerecords.SourceIDIn(node.ParentID)).
  80. Where(messagerecords.SubSourceIDEQ(0)).
  81. Where(messagerecords.SendTimeGTE(lowerBound)).
  82. Where(messagerecords.SendTimeLTE(upperBound)).
  83. All(ctx)
  84. }
  85. for _, s := range messages {
  86. // 判断 s.Id 是否是 s.ContactID 的最新记录
  87. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  88. Where(messagerecords.ContactIDEQ(s.ContactID)).
  89. Where(messagerecords.StatusEQ(3)).
  90. Order(ent.Desc(messagerecords.FieldCreatedAt)).
  91. First(ctx)
  92. if latest.ID == s.ID {
  93. // 创建 MessageRecords 记录
  94. if node.ActionMessage != nil {
  95. for i, c := range node.ActionMessage {
  96. meta := custom_types.Meta{}
  97. if c.Meta != nil {
  98. meta.Filename = c.Meta.Filename
  99. }
  100. _, _ = l.svcCtx.DB.MessageRecords.Create().
  101. SetStatus(1).
  102. SetBotWxid(s.BotWxid).
  103. SetContactID(s.ContactID).
  104. SetContactType(s.ContactType).
  105. SetContactWxid(s.ContactWxid).
  106. SetContentType(c.Type).
  107. SetContent(c.Content).
  108. SetMeta(meta).
  109. SetSourceType(4).
  110. SetSourceID(node.ID).
  111. SetSubSourceID(uint64(i)).
  112. SetOrganizationID(s.OrganizationID).
  113. Save(ctx)
  114. }
  115. } else {
  116. meta := custom_types.Meta{}
  117. _, _ = l.svcCtx.DB.MessageRecords.Create().
  118. SetStatus(1).
  119. SetBotWxid(s.BotWxid).
  120. SetContactID(s.ContactID).
  121. SetContactType(s.ContactType).
  122. SetContactWxid(s.ContactWxid).
  123. SetContentType(1).
  124. SetMeta(meta).
  125. SetSourceType(4).
  126. SetSourceID(node.ID).
  127. SetSubSourceID(0).
  128. SetOrganizationID(s.OrganizationID).
  129. Save(ctx)
  130. }
  131. if node.ActionForward != nil {
  132. if node.ActionForward.Wxid != "" {
  133. forwardWxids := splitString(node.ActionForward.Wxid)
  134. for _, forwardWxid := range forwardWxids {
  135. for i, message := range node.ActionForward.Action {
  136. meta := custom_types.Meta{}
  137. if message.Meta != nil {
  138. meta.Filename = message.Meta.Filename
  139. }
  140. _, _ = l.svcCtx.DB.MessageRecords.Create().
  141. SetBotWxid(s.BotWxid).
  142. SetContactID(0).
  143. SetContactType(0).
  144. SetContactWxid(forwardWxid).
  145. SetContentType(message.Type).
  146. SetContent(message.Content).
  147. SetMeta(meta).
  148. SetSourceType(4).
  149. SetSourceID(node.ID).
  150. SetSubSourceID(s.ContactID + uint64(i)).
  151. SetOrganizationID(s.OrganizationID).
  152. Save(l.ctx)
  153. }
  154. }
  155. }
  156. }
  157. // 查询当前联系人的标签关系
  158. currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
  159. if currentLabelRelationships == nil {
  160. continue
  161. }
  162. // 提取当前标签ID
  163. var currentLabelIds []uint64
  164. for _, relationship := range currentLabelRelationships {
  165. currentLabelIds = append(currentLabelIds, relationship.LabelID)
  166. }
  167. contact := &ent.Contact{
  168. ID: s.ContactID,
  169. Type: s.ContentType,
  170. WxWxid: s.BotWxid,
  171. Wxid: s.ContactWxid,
  172. }
  173. if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
  174. _ = l.AddLabelRelationships(stageMap, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID)
  175. }
  176. }
  177. }
  178. }
  179. finishTime := time.Now()
  180. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  181. return
  182. }
  183. func splitString(input string) []string {
  184. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  185. pattern := `[,,、]`
  186. re := regexp.MustCompile(pattern)
  187. // Split the input string based on the pattern
  188. return re.Split(input, -1)
  189. }
  190. func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64) (err error) {
  191. //// 开始事务
  192. //tx, err := l.svcCtx.DB.Tx(context.Background())
  193. //if err != nil {
  194. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  195. //}
  196. // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
  197. var newLabelIds []uint64
  198. var remLabelIds []uint64
  199. var finalLabelIds []uint64
  200. // 创建一个映射,用于快速查找 currentLabelIds 中的元素
  201. currentLabelIdSet := make(map[uint64]struct{})
  202. for _, id := range currentLabelIds {
  203. currentLabelIdSet[id] = struct{}{}
  204. }
  205. delLabelIdSet := make(map[uint64]struct{})
  206. for _, id := range delLabelIds {
  207. delLabelIdSet[id] = struct{}{}
  208. }
  209. if addLabelIds != nil {
  210. // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
  211. for _, id := range addLabelIds {
  212. if _, ce := currentLabelIdSet[id]; !ce {
  213. if _, re := delLabelIdSet[id]; !re {
  214. newLabelIds = append(newLabelIds, id)
  215. }
  216. }
  217. }
  218. if len(newLabelIds) > 0 {
  219. // 创建需要新增的标签关系
  220. for _, id := range newLabelIds {
  221. currentLabelIdSet[id] = struct{}{}
  222. _, err = l.svcCtx.DB.LabelRelationship.Create().
  223. SetLabelID(id).
  224. SetContactID(contact.ID).
  225. SetOrganizationID(organizationId).
  226. Save(l.ctx)
  227. if err != nil {
  228. //_ = tx.Rollback()
  229. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  230. }
  231. }
  232. // 合并 currentLabelIds 和 newLabelIds
  233. currentLabelIds = append(currentLabelIds, newLabelIds...)
  234. }
  235. }
  236. if delLabelIds != nil {
  237. // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
  238. for _, id := range delLabelIds {
  239. if _, exists := currentLabelIdSet[id]; exists {
  240. remLabelIds = append(newLabelIds, id)
  241. delete(currentLabelIdSet, id)
  242. }
  243. }
  244. if len(remLabelIds) > 0 {
  245. _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
  246. if err != nil {
  247. //_ = tx.Rollback()
  248. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  249. }
  250. }
  251. }
  252. if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
  253. return nil
  254. }
  255. for id := range currentLabelIdSet {
  256. finalLabelIds = append(finalLabelIds, id)
  257. }
  258. // 遍历 sop_stages,找出满足条件的 stage
  259. for key, stage := range sopStages {
  260. if stage != nil && stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
  261. // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
  262. _, err := l.svcCtx.DB.MessageRecords.Query().
  263. Where(
  264. messagerecords.ContactWxid(contact.Wxid),
  265. messagerecords.SourceType(3),
  266. messagerecords.SourceID(stage.ID),
  267. messagerecords.SubSourceID(0),
  268. ).
  269. Only(l.ctx)
  270. if !ent.IsNotFound(err) {
  271. continue
  272. }
  273. // 判断ActionMessage是否为空
  274. sourceType := 3
  275. if stage.ActionMessage != nil {
  276. for i, message := range stage.ActionMessage {
  277. meta := custom_types.Meta{}
  278. if message.Meta != nil {
  279. meta.Filename = message.Meta.Filename
  280. }
  281. _, _ = l.svcCtx.DB.MessageRecords.Create().
  282. SetNotNilBotWxid(&contact.WxWxid).
  283. SetNotNilContactID(&contact.ID).
  284. SetNotNilContactType(&contact.Type).
  285. SetNotNilContactWxid(&contact.Wxid).
  286. SetNotNilContentType(&message.Type).
  287. SetNotNilContent(&message.Content).
  288. SetMeta(meta).
  289. SetNotNilSourceType(&sourceType).
  290. SetNotNilSourceID(&stage.ID).
  291. SetSubSourceID(uint64(i)).
  292. SetOrganizationID(organizationId).
  293. Save(l.ctx)
  294. //if err != nil {
  295. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  296. //}
  297. }
  298. }
  299. if stage.ActionForward != nil {
  300. if stage.ActionForward.Wxid != "" {
  301. forwardWxids := splitString(stage.ActionForward.Wxid)
  302. for _, forwardWxid := range forwardWxids {
  303. for i, message := range stage.ActionForward.Action {
  304. meta := custom_types.Meta{}
  305. if message.Meta != nil {
  306. meta.Filename = message.Meta.Filename
  307. }
  308. _, err = l.svcCtx.DB.MessageRecords.Create().
  309. SetBotWxid(contact.WxWxid).
  310. SetContactID(0).
  311. SetContactType(0).
  312. SetContactWxid(forwardWxid).
  313. SetContentType(message.Type).
  314. SetContent(message.Content).
  315. SetMeta(meta).
  316. SetSourceType(sourceType).
  317. SetSourceID(stage.ID).
  318. SetSubSourceID(contact.ID + uint64(i)).
  319. SetOrganizationID(organizationId).
  320. Save(l.ctx)
  321. }
  322. }
  323. }
  324. }
  325. if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
  326. // 递归调用 AddLabelRelationships
  327. sopStages[key] = nil
  328. err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId)
  329. if err != nil {
  330. //_ = tx.Rollback()
  331. return err
  332. }
  333. }
  334. }
  335. }
  336. // 所有操作成功,提交事务
  337. //err = tx.Commit()
  338. //if err != nil {
  339. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  340. //}
  341. return nil
  342. }
  343. func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
  344. labelIdSet := make(map[uint64]struct{})
  345. for _, id := range labelIdList {
  346. labelIdSet[id] = struct{}{}
  347. }
  348. for _, condition := range conditionList {
  349. match := false
  350. for _, id := range condition.LabelIdList {
  351. if _, ok := labelIdSet[id]; ok {
  352. match = true
  353. break
  354. }
  355. }
  356. if condition.Equal == 2 {
  357. match = !match
  358. }
  359. if conditionOperator == 1 && !match {
  360. return false
  361. } else if conditionOperator == 2 && match {
  362. return true
  363. }
  364. }
  365. return conditionOperator == 1
  366. }