send_wx_on_timeout.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package crontask
  2. import (
  3. "context"
  4. "regexp"
  5. "time"
  6. "wechat-api/ent"
  7. "wechat-api/ent/contact"
  8. "wechat-api/ent/custom_types"
  9. "wechat-api/ent/labelrelationship"
  10. "wechat-api/ent/messagerecords"
  11. "wechat-api/ent/sopnode"
  12. "wechat-api/ent/sopstage"
  13. "wechat-api/ent/soptask"
  14. "wechat-api/internal/utils/dberrorhandler"
  15. )
  16. func (l *CronTask) sendWxOnTimeout() {
  17. ctx := context.Background()
  18. startTime := time.Now()
  19. sopTask, _ := l.svcCtx.DB.SopTask.Query().
  20. Where(
  21. soptask.StatusEQ(3),
  22. soptask.DeletedAtIsNil(),
  23. ).
  24. All(l.ctx)
  25. sopTasks := make([]uint64, 0)
  26. for _, st := range sopTask {
  27. sopTasks = append(sopTasks, st.ID)
  28. }
  29. sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx)
  30. stageMap := make(map[uint64]*ent.SopStage)
  31. for _, stage := range sopStages {
  32. stageMap[stage.ID] = stage
  33. }
  34. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  35. nodes, err := l.svcCtx.DB.SopNode.Query().
  36. Where(sopnode.NoReplyConditionNEQ(0)).
  37. Where(sopnode.HasSopStageWith(
  38. sopstage.StatusEQ(1),
  39. sopstage.DeletedAtIsNil(),
  40. sopstage.HasSopTaskWith(
  41. soptask.StatusEQ(3),
  42. soptask.DeletedAtIsNil(),
  43. ),
  44. )).
  45. All(ctx)
  46. if err != nil {
  47. l.Logger.Errorf("get node list failed %v", err)
  48. return
  49. }
  50. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  51. //parentNodes := make([]uint64, 0)
  52. //stages := make([]uint64, 0)
  53. messages := make([]*ent.MessageRecords, 0)
  54. for _, node := range nodes {
  55. var coef uint64 = 1
  56. switch node.NoReplyUnit {
  57. case "W":
  58. coef = 60 * 24 * 7
  59. case "D":
  60. coef = 60 * 24
  61. case "h":
  62. coef = 60
  63. }
  64. // 查询 node 对应的 stage 记录
  65. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  66. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  67. if node.ParentID == 0 {
  68. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  69. Where(messagerecords.StatusEQ(3)).
  70. Where(messagerecords.SourceTypeEQ(3)).
  71. Where(messagerecords.SourceIDEQ(node.StageID)).
  72. Where(messagerecords.SubSourceIDEQ(0)).
  73. Where(messagerecords.SendTimeGTE(lowerBound)).
  74. Where(messagerecords.SendTimeLTE(upperBound)).
  75. All(ctx)
  76. } else {
  77. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  78. Where(messagerecords.StatusEQ(3)).
  79. Where(messagerecords.SourceTypeEQ(4)).
  80. Where(messagerecords.SourceIDIn(node.ParentID)).
  81. Where(messagerecords.SubSourceIDEQ(0)).
  82. Where(messagerecords.SendTimeGTE(lowerBound)).
  83. Where(messagerecords.SendTimeLTE(upperBound)).
  84. All(ctx)
  85. }
  86. for _, s := range messages {
  87. // 判断 s.Id 是否是 s.ContactID 的最新记录
  88. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  89. Where(messagerecords.ContactIDEQ(s.ContactID)).
  90. Where(messagerecords.StatusEQ(3)).
  91. Order(ent.Desc(messagerecords.FieldID)).
  92. First(ctx)
  93. if latest.ID == s.ID {
  94. // 创建 MessageRecords 记录
  95. if node.ActionMessage != nil {
  96. for i, c := range node.ActionMessage {
  97. meta := custom_types.Meta{}
  98. if c.Meta != nil {
  99. meta.Filename = c.Meta.Filename
  100. }
  101. _, _ = l.svcCtx.DB.MessageRecords.Create().
  102. SetStatus(1).
  103. SetBotWxid(s.BotWxid).
  104. SetContactID(s.ContactID).
  105. SetContactType(s.ContactType).
  106. SetContactWxid(s.ContactWxid).
  107. SetContentType(c.Type).
  108. SetContent(c.Content).
  109. SetMeta(meta).
  110. SetSourceType(4).
  111. SetSourceID(node.ID).
  112. SetSubSourceID(uint64(i)).
  113. SetOrganizationID(s.OrganizationID).
  114. Save(ctx)
  115. }
  116. } else {
  117. meta := custom_types.Meta{}
  118. _, _ = l.svcCtx.DB.MessageRecords.Create().
  119. SetStatus(1).
  120. SetBotWxid(s.BotWxid).
  121. SetContactID(s.ContactID).
  122. SetContactType(s.ContactType).
  123. SetContactWxid(s.ContactWxid).
  124. SetContentType(1).
  125. SetMeta(meta).
  126. SetSourceType(4).
  127. SetSourceID(node.ID).
  128. SetSubSourceID(0).
  129. SetOrganizationID(s.OrganizationID).
  130. Save(ctx)
  131. }
  132. if node.ActionForward != nil {
  133. if node.ActionForward.Wxid != "" {
  134. forwardWxids := splitString(node.ActionForward.Wxid)
  135. for _, forwardWxid := range forwardWxids {
  136. for i, message := range node.ActionForward.Action {
  137. meta := custom_types.Meta{}
  138. if message.Meta != nil {
  139. meta.Filename = message.Meta.Filename
  140. }
  141. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  142. contact.WxWxidEQ(s.BotWxid),
  143. contact.WxidEQ(s.ContactWxid),
  144. contact.CtypeIn(1, 3),
  145. ).First(l.ctx)
  146. content := varReplace(message.Content, contactInfo)
  147. _, _ = l.svcCtx.DB.MessageRecords.Create().
  148. SetBotWxid(s.BotWxid).
  149. SetContactID(0).
  150. SetContactType(0).
  151. SetContactWxid(forwardWxid).
  152. SetContentType(message.Type).
  153. SetContent(content).
  154. SetMeta(meta).
  155. SetSourceType(4).
  156. SetSourceID(node.ID).
  157. SetSubSourceID(s.ContactID + uint64(i)).
  158. SetOrganizationID(s.OrganizationID).
  159. Save(l.ctx)
  160. }
  161. }
  162. }
  163. }
  164. // 查询当前联系人的标签关系
  165. currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
  166. if currentLabelRelationships == nil {
  167. continue
  168. }
  169. // 提取当前标签ID
  170. var currentLabelIds []uint64
  171. for _, relationship := range currentLabelRelationships {
  172. currentLabelIds = append(currentLabelIds, relationship.LabelID)
  173. }
  174. contact := &ent.Contact{
  175. ID: s.ContactID,
  176. Type: s.ContentType,
  177. WxWxid: s.BotWxid,
  178. Wxid: s.ContactWxid,
  179. }
  180. if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
  181. _ = l.AddLabelRelationships(stageMap, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID)
  182. }
  183. }
  184. }
  185. }
  186. finishTime := time.Now()
  187. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  188. return
  189. }
  190. func splitString(input string) []string {
  191. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  192. pattern := `[,,、]`
  193. re := regexp.MustCompile(pattern)
  194. // Split the input string based on the pattern
  195. return re.Split(input, -1)
  196. }
  197. func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64) (err error) {
  198. //// 开始事务
  199. //tx, err := l.svcCtx.DB.Tx(context.Background())
  200. //if err != nil {
  201. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  202. //}
  203. // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
  204. var newLabelIds []uint64
  205. var remLabelIds []uint64
  206. var finalLabelIds []uint64
  207. // 创建一个映射,用于快速查找 currentLabelIds 中的元素
  208. currentLabelIdSet := make(map[uint64]struct{})
  209. for _, id := range currentLabelIds {
  210. currentLabelIdSet[id] = struct{}{}
  211. }
  212. delLabelIdSet := make(map[uint64]struct{})
  213. for _, id := range delLabelIds {
  214. delLabelIdSet[id] = struct{}{}
  215. }
  216. if addLabelIds != nil {
  217. // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
  218. for _, id := range addLabelIds {
  219. if _, ce := currentLabelIdSet[id]; !ce {
  220. if _, re := delLabelIdSet[id]; !re {
  221. newLabelIds = append(newLabelIds, id)
  222. }
  223. }
  224. }
  225. if len(newLabelIds) > 0 {
  226. // 创建需要新增的标签关系
  227. for _, id := range newLabelIds {
  228. currentLabelIdSet[id] = struct{}{}
  229. _, err = l.svcCtx.DB.LabelRelationship.Create().
  230. SetLabelID(id).
  231. SetContactID(contact.ID).
  232. SetOrganizationID(organizationId).
  233. Save(l.ctx)
  234. if err != nil {
  235. //_ = tx.Rollback()
  236. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  237. }
  238. }
  239. // 合并 currentLabelIds 和 newLabelIds
  240. currentLabelIds = append(currentLabelIds, newLabelIds...)
  241. }
  242. }
  243. if delLabelIds != nil {
  244. // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
  245. for _, id := range delLabelIds {
  246. if _, exists := currentLabelIdSet[id]; exists {
  247. remLabelIds = append(remLabelIds, id)
  248. delete(currentLabelIdSet, id)
  249. }
  250. }
  251. if len(remLabelIds) > 0 {
  252. _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
  253. if err != nil {
  254. //_ = tx.Rollback()
  255. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  256. }
  257. }
  258. }
  259. if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
  260. return nil
  261. }
  262. for id := range currentLabelIdSet {
  263. finalLabelIds = append(finalLabelIds, id)
  264. }
  265. // 遍历 sop_stages,找出满足条件的 stage
  266. for key, stage := range sopStages {
  267. if stage != nil && stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
  268. // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
  269. _, err := l.svcCtx.DB.MessageRecords.Query().
  270. Where(
  271. messagerecords.ContactWxid(contact.Wxid),
  272. messagerecords.SourceType(3),
  273. messagerecords.SourceID(stage.ID),
  274. messagerecords.SubSourceID(0),
  275. ).
  276. Only(l.ctx)
  277. if !ent.IsNotFound(err) {
  278. continue
  279. }
  280. // 判断ActionMessage是否为空
  281. sourceType := 3
  282. if stage.ActionMessage != nil {
  283. for i, message := range stage.ActionMessage {
  284. meta := custom_types.Meta{}
  285. if message.Meta != nil {
  286. meta.Filename = message.Meta.Filename
  287. }
  288. _, _ = l.svcCtx.DB.MessageRecords.Create().
  289. SetNotNilBotWxid(&contact.WxWxid).
  290. SetNotNilContactID(&contact.ID).
  291. SetNotNilContactType(&contact.Type).
  292. SetNotNilContactWxid(&contact.Wxid).
  293. SetNotNilContentType(&message.Type).
  294. SetNotNilContent(&message.Content).
  295. SetMeta(meta).
  296. SetNotNilSourceType(&sourceType).
  297. SetNotNilSourceID(&stage.ID).
  298. SetSubSourceID(uint64(i)).
  299. SetOrganizationID(organizationId).
  300. Save(l.ctx)
  301. //if err != nil {
  302. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  303. //}
  304. }
  305. }
  306. if stage.ActionForward != nil {
  307. if stage.ActionForward.Wxid != "" {
  308. forwardWxids := splitString(stage.ActionForward.Wxid)
  309. for _, forwardWxid := range forwardWxids {
  310. for i, message := range stage.ActionForward.Action {
  311. meta := custom_types.Meta{}
  312. if message.Meta != nil {
  313. meta.Filename = message.Meta.Filename
  314. }
  315. _, err = l.svcCtx.DB.MessageRecords.Create().
  316. SetBotWxid(contact.WxWxid).
  317. SetContactID(0).
  318. SetContactType(0).
  319. SetContactWxid(forwardWxid).
  320. SetContentType(message.Type).
  321. SetContent(message.Content).
  322. SetMeta(meta).
  323. SetSourceType(sourceType).
  324. SetSourceID(stage.ID).
  325. SetSubSourceID(contact.ID + uint64(i)).
  326. SetOrganizationID(organizationId).
  327. Save(l.ctx)
  328. }
  329. }
  330. }
  331. }
  332. if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
  333. // 递归调用 AddLabelRelationships
  334. sopStages[key] = nil
  335. err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId)
  336. if err != nil {
  337. //_ = tx.Rollback()
  338. return err
  339. }
  340. }
  341. }
  342. }
  343. // 所有操作成功,提交事务
  344. //err = tx.Commit()
  345. //if err != nil {
  346. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  347. //}
  348. return nil
  349. }
  350. func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
  351. labelIdSet := make(map[uint64]struct{})
  352. for _, id := range labelIdList {
  353. labelIdSet[id] = struct{}{}
  354. }
  355. for _, condition := range conditionList {
  356. match := false
  357. for _, id := range condition.LabelIdList {
  358. if _, ok := labelIdSet[id]; ok {
  359. match = true
  360. break
  361. }
  362. }
  363. if condition.Equal == 2 {
  364. match = !match
  365. }
  366. if conditionOperator == 1 && !match {
  367. return false
  368. } else if conditionOperator == 2 && match {
  369. return true
  370. }
  371. }
  372. return conditionOperator == 1
  373. }