send_wx_on_timeout.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. package crontask
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "regexp"
  6. "time"
  7. "wechat-api/ent"
  8. "wechat-api/ent/contact"
  9. "wechat-api/ent/custom_types"
  10. "wechat-api/ent/labelrelationship"
  11. "wechat-api/ent/messagerecords"
  12. "wechat-api/ent/sopnode"
  13. "wechat-api/ent/sopstage"
  14. "wechat-api/ent/soptask"
  15. "wechat-api/internal/utils/dberrorhandler"
  16. )
  17. func (l *CronTask) sendWxOnTimeout() {
  18. ctx := context.Background()
  19. startTime := time.Now()
  20. sopTask, _ := l.svcCtx.DB.SopTask.Query().
  21. Where(
  22. soptask.StatusEQ(3),
  23. soptask.DeletedAtIsNil(),
  24. ).
  25. All(l.ctx)
  26. sopTasks := make([]uint64, 0)
  27. for _, st := range sopTask {
  28. sopTasks = append(sopTasks, st.ID)
  29. }
  30. botStageMap := make(map[string]map[uint64]*ent.SopStage)
  31. for _, t := range sopTask {
  32. for _, bot := range t.BotWxidList {
  33. sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDEQ(t.ID), sopstage.DeletedAtIsNil()).All(l.ctx)
  34. stageMap := make(map[uint64]*ent.SopStage)
  35. for _, stage := range sopStages {
  36. stageMap[stage.ID] = stage
  37. }
  38. botStageMap[bot] = stageMap
  39. }
  40. }
  41. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  42. nodes, err := l.svcCtx.DB.SopNode.Query().
  43. Where(sopnode.NoReplyConditionNEQ(0)).
  44. Where(sopnode.HasSopStageWith(
  45. sopstage.StatusEQ(1),
  46. sopstage.DeletedAtIsNil(),
  47. sopstage.HasSopTaskWith(
  48. soptask.StatusEQ(3),
  49. soptask.DeletedAtIsNil(),
  50. ),
  51. )).
  52. All(ctx)
  53. if err != nil {
  54. l.Logger.Errorf("get node list failed %v", err)
  55. return
  56. }
  57. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  58. //parentNodes := make([]uint64, 0)
  59. //stages := make([]uint64, 0)
  60. messages := make([]*ent.MessageRecords, 0)
  61. for _, node := range nodes {
  62. var coef uint64 = 1
  63. switch node.NoReplyUnit {
  64. case "W":
  65. coef = 60 * 24 * 7
  66. case "D":
  67. coef = 60 * 24
  68. case "h":
  69. coef = 60
  70. }
  71. // 查询 node 对应的 stage 记录
  72. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  73. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  74. if node.ParentID == 0 {
  75. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  76. Where(messagerecords.StatusEQ(3)).
  77. Where(messagerecords.SourceTypeEQ(3)).
  78. Where(messagerecords.SourceIDEQ(node.StageID)).
  79. Where(messagerecords.SendTimeGTE(lowerBound)).
  80. Where(messagerecords.SendTimeLTE(upperBound)).
  81. All(ctx)
  82. } else {
  83. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  84. Where(messagerecords.StatusEQ(3)).
  85. Where(messagerecords.SourceTypeEQ(4)).
  86. Where(messagerecords.SourceIDIn(node.ParentID)).
  87. Where(messagerecords.SendTimeGTE(lowerBound)).
  88. Where(messagerecords.SendTimeLTE(upperBound)).
  89. All(ctx)
  90. }
  91. for _, s := range messages {
  92. // 判断 s.Id 是否是 s.ContactID 的最新记录
  93. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  94. Where(messagerecords.ContactIDEQ(s.ContactID)).
  95. Where(messagerecords.StatusEQ(3)).
  96. Order(ent.Desc(messagerecords.FieldID)).
  97. First(ctx)
  98. logx.Info("latest.ID: ", latest.ID)
  99. logx.Info("s.ID: ", s.ID)
  100. if latest.ID == s.ID {
  101. // 创建 MessageRecords 记录
  102. if node.ActionMessage != nil {
  103. for i, c := range node.ActionMessage {
  104. meta := custom_types.Meta{}
  105. if c.Meta != nil {
  106. meta.Filename = c.Meta.Filename
  107. }
  108. _, _ = l.svcCtx.DB.MessageRecords.Create().
  109. SetStatus(1).
  110. SetBotWxid(s.BotWxid).
  111. SetContactID(s.ContactID).
  112. SetContactType(s.ContactType).
  113. SetContactWxid(s.ContactWxid).
  114. SetContentType(c.Type).
  115. SetContent(c.Content).
  116. SetMeta(meta).
  117. SetSourceType(4).
  118. SetSourceID(node.ID).
  119. SetSubSourceID(uint64(i)).
  120. SetOrganizationID(s.OrganizationID).
  121. Save(ctx)
  122. }
  123. } else {
  124. meta := custom_types.Meta{}
  125. _, _ = l.svcCtx.DB.MessageRecords.Create().
  126. SetStatus(1).
  127. SetBotWxid(s.BotWxid).
  128. SetContactID(s.ContactID).
  129. SetContactType(s.ContactType).
  130. SetContactWxid(s.ContactWxid).
  131. SetContentType(1).
  132. SetMeta(meta).
  133. SetSourceType(4).
  134. SetSourceID(node.ID).
  135. SetSubSourceID(0).
  136. SetOrganizationID(s.OrganizationID).
  137. Save(ctx)
  138. }
  139. if node.ActionForward != nil {
  140. if node.ActionForward.Wxid != "" {
  141. forwardWxids := splitString(node.ActionForward.Wxid)
  142. for _, forwardWxid := range forwardWxids {
  143. for i, message := range node.ActionForward.Action {
  144. meta := custom_types.Meta{}
  145. if message.Meta != nil {
  146. meta.Filename = message.Meta.Filename
  147. }
  148. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  149. contact.WxWxidEQ(s.BotWxid),
  150. contact.WxidEQ(s.ContactWxid),
  151. contact.CtypeIn(1, 3),
  152. ).First(l.ctx)
  153. content := varReplace(message.Content, contactInfo)
  154. _, _ = l.svcCtx.DB.MessageRecords.Create().
  155. SetBotWxid(s.BotWxid).
  156. SetContactID(0).
  157. SetContactType(0).
  158. SetContactWxid(forwardWxid).
  159. SetContentType(message.Type).
  160. SetContent(content).
  161. SetMeta(meta).
  162. SetSourceType(4).
  163. SetSourceID(node.ID).
  164. SetSubSourceID(s.ContactID + uint64(i)).
  165. SetOrganizationID(s.OrganizationID).
  166. Save(l.ctx)
  167. }
  168. }
  169. }
  170. }
  171. // 查询当前联系人的标签关系
  172. currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
  173. if currentLabelRelationships == nil {
  174. continue
  175. }
  176. // 提取当前标签ID
  177. var currentLabelIds []uint64
  178. for _, relationship := range currentLabelRelationships {
  179. currentLabelIds = append(currentLabelIds, relationship.LabelID)
  180. }
  181. contact := &ent.Contact{
  182. ID: s.ContactID,
  183. Type: s.ContactType,
  184. WxWxid: s.BotWxid,
  185. Wxid: s.ContactWxid,
  186. }
  187. if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
  188. visitedStages := make(map[uint64]bool)
  189. _ = l.AddLabelRelationships(botStageMap[s.BotWxid], *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID, visitedStages)
  190. }
  191. }
  192. }
  193. }
  194. finishTime := time.Now()
  195. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  196. return
  197. }
  198. func splitString(input string) []string {
  199. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  200. pattern := `[,,、]`
  201. re := regexp.MustCompile(pattern)
  202. // Split the input string based on the pattern
  203. return re.Split(input, -1)
  204. }
  205. func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64, visitedStages map[uint64]bool) (err error) {
  206. //// 开始事务
  207. //tx, err := l.svcCtx.DB.Tx(context.Background())
  208. //if err != nil {
  209. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  210. //}
  211. // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
  212. var newLabelIds []uint64
  213. var remLabelIds []uint64
  214. var finalLabelIds []uint64
  215. // 创建一个映射,用于快速查找 currentLabelIds 中的元素
  216. currentLabelIdSet := make(map[uint64]struct{})
  217. for _, id := range currentLabelIds {
  218. currentLabelIdSet[id] = struct{}{}
  219. }
  220. delLabelIdSet := make(map[uint64]struct{})
  221. for _, id := range delLabelIds {
  222. delLabelIdSet[id] = struct{}{}
  223. }
  224. if addLabelIds != nil {
  225. // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
  226. for _, id := range addLabelIds {
  227. if _, ce := currentLabelIdSet[id]; !ce {
  228. if _, re := delLabelIdSet[id]; !re {
  229. newLabelIds = append(newLabelIds, id)
  230. }
  231. }
  232. }
  233. if len(newLabelIds) > 0 {
  234. // 创建需要新增的标签关系
  235. for _, id := range newLabelIds {
  236. currentLabelIdSet[id] = struct{}{}
  237. _, err = l.svcCtx.DB.LabelRelationship.Create().
  238. SetLabelID(id).
  239. SetContactID(contact.ID).
  240. SetOrganizationID(organizationId).
  241. Save(l.ctx)
  242. if err != nil {
  243. //_ = tx.Rollback()
  244. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  245. }
  246. }
  247. // 合并 currentLabelIds 和 newLabelIds
  248. currentLabelIds = append(currentLabelIds, newLabelIds...)
  249. }
  250. }
  251. if delLabelIds != nil {
  252. // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
  253. for _, id := range delLabelIds {
  254. if _, exists := currentLabelIdSet[id]; exists {
  255. remLabelIds = append(remLabelIds, id)
  256. delete(currentLabelIdSet, id)
  257. }
  258. }
  259. if len(remLabelIds) > 0 {
  260. _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
  261. if err != nil {
  262. //_ = tx.Rollback()
  263. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  264. }
  265. }
  266. }
  267. if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
  268. return nil
  269. }
  270. for id := range currentLabelIdSet {
  271. finalLabelIds = append(finalLabelIds, id)
  272. }
  273. // 遍历 sop_stages,找出满足条件的 stage
  274. for key, stage := range sopStages {
  275. if stage == nil || visitedStages[key] {
  276. continue
  277. }
  278. if stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
  279. // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
  280. _, err := l.svcCtx.DB.MessageRecords.Query().
  281. Where(
  282. messagerecords.ContactWxid(contact.Wxid),
  283. messagerecords.SourceType(3),
  284. messagerecords.SourceID(stage.ID),
  285. messagerecords.SubSourceID(0),
  286. ).
  287. Only(l.ctx)
  288. if !ent.IsNotFound(err) {
  289. continue
  290. }
  291. // 判断ActionMessage是否为空
  292. sourceType := 3
  293. if stage.ActionMessage != nil {
  294. for i, message := range stage.ActionMessage {
  295. meta := custom_types.Meta{}
  296. if message.Meta != nil {
  297. meta.Filename = message.Meta.Filename
  298. }
  299. _, _ = l.svcCtx.DB.MessageRecords.Create().
  300. SetNotNilBotWxid(&contact.WxWxid).
  301. SetNotNilContactID(&contact.ID).
  302. SetNotNilContactType(&contact.Type).
  303. SetNotNilContactWxid(&contact.Wxid).
  304. SetNotNilContentType(&message.Type).
  305. SetNotNilContent(&message.Content).
  306. SetMeta(meta).
  307. SetNotNilSourceType(&sourceType).
  308. SetNotNilSourceID(&stage.ID).
  309. SetSubSourceID(uint64(i)).
  310. SetOrganizationID(organizationId).
  311. Save(l.ctx)
  312. //if err != nil {
  313. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  314. //}
  315. }
  316. }
  317. if stage.ActionForward != nil {
  318. if stage.ActionForward.Wxid != "" {
  319. forwardWxids := splitString(stage.ActionForward.Wxid)
  320. for _, forwardWxid := range forwardWxids {
  321. for i, message := range stage.ActionForward.Action {
  322. meta := custom_types.Meta{}
  323. if message.Meta != nil {
  324. meta.Filename = message.Meta.Filename
  325. }
  326. _, err = l.svcCtx.DB.MessageRecords.Create().
  327. SetBotWxid(contact.WxWxid).
  328. SetContactID(0).
  329. SetContactType(0).
  330. SetContactWxid(forwardWxid).
  331. SetContentType(message.Type).
  332. SetContent(message.Content).
  333. SetMeta(meta).
  334. SetSourceType(sourceType).
  335. SetSourceID(stage.ID).
  336. SetSubSourceID(contact.ID + uint64(i)).
  337. SetOrganizationID(organizationId).
  338. Save(l.ctx)
  339. }
  340. }
  341. }
  342. }
  343. if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
  344. // 递归调用 AddLabelRelationships
  345. visitedStages[key] = true
  346. err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId, visitedStages)
  347. if err != nil {
  348. //_ = tx.Rollback()
  349. return err
  350. }
  351. }
  352. }
  353. }
  354. // 所有操作成功,提交事务
  355. //err = tx.Commit()
  356. //if err != nil {
  357. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  358. //}
  359. return nil
  360. }
  361. func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
  362. labelIdSet := make(map[uint64]struct{})
  363. for _, id := range labelIdList {
  364. labelIdSet[id] = struct{}{}
  365. }
  366. for _, condition := range conditionList {
  367. match := false
  368. for _, id := range condition.LabelIdList {
  369. if _, ok := labelIdSet[id]; ok {
  370. match = true
  371. break
  372. }
  373. }
  374. if condition.Equal == 2 {
  375. match = !match
  376. }
  377. if conditionOperator == 1 && !match {
  378. return false
  379. } else if conditionOperator == 2 && match {
  380. return true
  381. }
  382. }
  383. return conditionOperator == 1
  384. }