send_wx_on_timeout.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. package crontask
  2. import (
  3. "context"
  4. "github.com/zeromicro/go-zero/core/logx"
  5. "regexp"
  6. "time"
  7. "wechat-api/ent"
  8. "wechat-api/ent/contact"
  9. "wechat-api/ent/custom_types"
  10. "wechat-api/ent/labelrelationship"
  11. "wechat-api/ent/messagerecords"
  12. "wechat-api/ent/sopnode"
  13. "wechat-api/ent/sopstage"
  14. "wechat-api/ent/soptask"
  15. "wechat-api/internal/utils/dberrorhandler"
  16. )
  17. func (l *CronTask) sendWxOnTimeout() {
  18. ctx := context.Background()
  19. startTime := time.Now()
  20. sopTask, _ := l.svcCtx.DB.SopTask.Query().
  21. Where(
  22. soptask.StatusEQ(3),
  23. soptask.DeletedAtIsNil(),
  24. ).
  25. All(l.ctx)
  26. sopTasks := make([]uint64, 0)
  27. for _, st := range sopTask {
  28. sopTasks = append(sopTasks, st.ID)
  29. }
  30. sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx)
  31. stageMap := make(map[uint64]*ent.SopStage)
  32. for _, stage := range sopStages {
  33. stageMap[stage.ID] = stage
  34. }
  35. // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
  36. nodes, err := l.svcCtx.DB.SopNode.Query().
  37. Where(sopnode.NoReplyConditionNEQ(0)).
  38. Where(sopnode.HasSopStageWith(
  39. sopstage.StatusEQ(1),
  40. sopstage.DeletedAtIsNil(),
  41. sopstage.HasSopTaskWith(
  42. soptask.StatusEQ(3),
  43. soptask.DeletedAtIsNil(),
  44. ),
  45. )).
  46. All(ctx)
  47. if err != nil {
  48. l.Logger.Errorf("get node list failed %v", err)
  49. return
  50. }
  51. // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
  52. //parentNodes := make([]uint64, 0)
  53. //stages := make([]uint64, 0)
  54. messages := make([]*ent.MessageRecords, 0)
  55. for _, node := range nodes {
  56. var coef uint64 = 1
  57. switch node.NoReplyUnit {
  58. case "W":
  59. coef = 60 * 24 * 7
  60. case "D":
  61. coef = 60 * 24
  62. case "h":
  63. coef = 60
  64. }
  65. // 查询 node 对应的 stage 记录
  66. lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
  67. upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
  68. if node.ParentID == 0 {
  69. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  70. Where(messagerecords.StatusEQ(3)).
  71. Where(messagerecords.SourceTypeEQ(3)).
  72. Where(messagerecords.SourceIDEQ(node.StageID)).
  73. Where(messagerecords.SubSourceIDEQ(0)).
  74. //Where(messagerecords.SendTimeGTE(lowerBound)).
  75. Where(messagerecords.SendTimeLTE(upperBound)).
  76. All(ctx)
  77. } else {
  78. messages, _ = l.svcCtx.DB.MessageRecords.Query().
  79. Where(messagerecords.StatusEQ(3)).
  80. Where(messagerecords.SourceTypeEQ(4)).
  81. Where(messagerecords.SourceIDIn(node.ParentID)).
  82. Where(messagerecords.SubSourceIDEQ(0)).
  83. Where(messagerecords.SendTimeGTE(lowerBound)).
  84. Where(messagerecords.SendTimeLTE(upperBound)).
  85. All(ctx)
  86. }
  87. for _, s := range messages {
  88. // 判断 s.Id 是否是 s.ContactID 的最新记录
  89. latest, _ := l.svcCtx.DB.MessageRecords.Query().
  90. Where(messagerecords.ContactIDEQ(s.ContactID)).
  91. Where(messagerecords.StatusEQ(3)).
  92. Order(ent.Desc(messagerecords.FieldID)).
  93. First(ctx)
  94. if latest.ID == s.ID {
  95. // 创建 MessageRecords 记录
  96. if node.ActionMessage != nil {
  97. for i, c := range node.ActionMessage {
  98. meta := custom_types.Meta{}
  99. if c.Meta != nil {
  100. meta.Filename = c.Meta.Filename
  101. }
  102. _, _ = l.svcCtx.DB.MessageRecords.Create().
  103. SetStatus(1).
  104. SetBotWxid(s.BotWxid).
  105. SetContactID(s.ContactID).
  106. SetContactType(s.ContactType).
  107. SetContactWxid(s.ContactWxid).
  108. SetContentType(c.Type).
  109. SetContent(c.Content).
  110. SetMeta(meta).
  111. SetSourceType(4).
  112. SetSourceID(node.ID).
  113. SetSubSourceID(uint64(i)).
  114. SetOrganizationID(s.OrganizationID).
  115. Save(ctx)
  116. }
  117. } else {
  118. meta := custom_types.Meta{}
  119. _, _ = l.svcCtx.DB.MessageRecords.Create().
  120. SetStatus(1).
  121. SetBotWxid(s.BotWxid).
  122. SetContactID(s.ContactID).
  123. SetContactType(s.ContactType).
  124. SetContactWxid(s.ContactWxid).
  125. SetContentType(1).
  126. SetMeta(meta).
  127. SetSourceType(4).
  128. SetSourceID(node.ID).
  129. SetSubSourceID(0).
  130. SetOrganizationID(s.OrganizationID).
  131. Save(ctx)
  132. }
  133. if node.ActionForward != nil {
  134. if node.ActionForward.Wxid != "" {
  135. forwardWxids := splitString(node.ActionForward.Wxid)
  136. for _, forwardWxid := range forwardWxids {
  137. for i, message := range node.ActionForward.Action {
  138. meta := custom_types.Meta{}
  139. if message.Meta != nil {
  140. meta.Filename = message.Meta.Filename
  141. }
  142. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  143. contact.WxWxidEQ(s.BotWxid),
  144. contact.WxidEQ(s.ContactWxid),
  145. contact.CtypeIn(1, 3),
  146. ).First(l.ctx)
  147. content := varReplace(message.Content, contactInfo)
  148. _, _ = l.svcCtx.DB.MessageRecords.Create().
  149. SetBotWxid(s.BotWxid).
  150. SetContactID(0).
  151. SetContactType(0).
  152. SetContactWxid(forwardWxid).
  153. SetContentType(message.Type).
  154. SetContent(content).
  155. SetMeta(meta).
  156. SetSourceType(4).
  157. SetSourceID(node.ID).
  158. SetSubSourceID(s.ContactID + uint64(i)).
  159. SetOrganizationID(s.OrganizationID).
  160. Save(l.ctx)
  161. }
  162. }
  163. }
  164. }
  165. // 查询当前联系人的标签关系
  166. currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
  167. if currentLabelRelationships == nil {
  168. continue
  169. }
  170. // 提取当前标签ID
  171. var currentLabelIds []uint64
  172. for _, relationship := range currentLabelRelationships {
  173. currentLabelIds = append(currentLabelIds, relationship.LabelID)
  174. }
  175. contact := &ent.Contact{
  176. ID: s.ContactID,
  177. Type: s.ContentType,
  178. WxWxid: s.BotWxid,
  179. Wxid: s.ContactWxid,
  180. }
  181. if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
  182. visitedStages := make(map[uint64]bool)
  183. _ = l.AddLabelRelationships(stageMap, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID, visitedStages)
  184. }
  185. }
  186. }
  187. }
  188. finishTime := time.Now()
  189. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  190. return
  191. }
  192. func splitString(input string) []string {
  193. // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
  194. pattern := `[,,、]`
  195. re := regexp.MustCompile(pattern)
  196. // Split the input string based on the pattern
  197. return re.Split(input, -1)
  198. }
  199. func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64, visitedStages map[uint64]bool) (err error) {
  200. //// 开始事务
  201. //tx, err := l.svcCtx.DB.Tx(context.Background())
  202. //if err != nil {
  203. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  204. //}
  205. // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
  206. var newLabelIds []uint64
  207. var remLabelIds []uint64
  208. var finalLabelIds []uint64
  209. // 创建一个映射,用于快速查找 currentLabelIds 中的元素
  210. currentLabelIdSet := make(map[uint64]struct{})
  211. for _, id := range currentLabelIds {
  212. currentLabelIdSet[id] = struct{}{}
  213. }
  214. delLabelIdSet := make(map[uint64]struct{})
  215. for _, id := range delLabelIds {
  216. delLabelIdSet[id] = struct{}{}
  217. }
  218. if addLabelIds != nil {
  219. // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
  220. for _, id := range addLabelIds {
  221. if _, ce := currentLabelIdSet[id]; !ce {
  222. if _, re := delLabelIdSet[id]; !re {
  223. newLabelIds = append(newLabelIds, id)
  224. }
  225. }
  226. }
  227. if len(newLabelIds) > 0 {
  228. // 创建需要新增的标签关系
  229. for _, id := range newLabelIds {
  230. currentLabelIdSet[id] = struct{}{}
  231. _, err = l.svcCtx.DB.LabelRelationship.Create().
  232. SetLabelID(id).
  233. SetContactID(contact.ID).
  234. SetOrganizationID(organizationId).
  235. Save(l.ctx)
  236. if err != nil {
  237. //_ = tx.Rollback()
  238. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  239. }
  240. }
  241. // 合并 currentLabelIds 和 newLabelIds
  242. currentLabelIds = append(currentLabelIds, newLabelIds...)
  243. }
  244. }
  245. if delLabelIds != nil {
  246. // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
  247. for _, id := range delLabelIds {
  248. if _, exists := currentLabelIdSet[id]; exists {
  249. remLabelIds = append(remLabelIds, id)
  250. delete(currentLabelIdSet, id)
  251. }
  252. }
  253. if len(remLabelIds) > 0 {
  254. _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
  255. if err != nil {
  256. //_ = tx.Rollback()
  257. return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  258. }
  259. }
  260. }
  261. if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
  262. return nil
  263. }
  264. for id := range currentLabelIdSet {
  265. finalLabelIds = append(finalLabelIds, id)
  266. }
  267. // 遍历 sop_stages,找出满足条件的 stage
  268. for key, stage := range sopStages {
  269. if stage == nil || visitedStages[key] {
  270. continue
  271. }
  272. if stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
  273. // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
  274. x, err := l.svcCtx.DB.MessageRecords.Query().
  275. Where(
  276. messagerecords.ContactWxid(contact.Wxid),
  277. messagerecords.SourceType(3),
  278. messagerecords.SourceID(stage.ID),
  279. messagerecords.SubSourceID(0),
  280. ).
  281. Only(l.ctx)
  282. logx.Info("x: ", x)
  283. logx.Info("err: ", err)
  284. if !ent.IsNotFound(err) {
  285. logx.Info("continue")
  286. continue
  287. }
  288. // 判断ActionMessage是否为空
  289. sourceType := 3
  290. if stage.ActionMessage != nil {
  291. for i, message := range stage.ActionMessage {
  292. meta := custom_types.Meta{}
  293. if message.Meta != nil {
  294. meta.Filename = message.Meta.Filename
  295. }
  296. _, _ = l.svcCtx.DB.MessageRecords.Create().
  297. SetNotNilBotWxid(&contact.WxWxid).
  298. SetNotNilContactID(&contact.ID).
  299. SetNotNilContactType(&contact.Type).
  300. SetNotNilContactWxid(&contact.Wxid).
  301. SetNotNilContentType(&message.Type).
  302. SetNotNilContent(&message.Content).
  303. SetMeta(meta).
  304. SetNotNilSourceType(&sourceType).
  305. SetNotNilSourceID(&stage.ID).
  306. SetSubSourceID(uint64(i)).
  307. SetOrganizationID(organizationId).
  308. Save(l.ctx)
  309. //if err != nil {
  310. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  311. //}
  312. }
  313. }
  314. if stage.ActionForward != nil {
  315. if stage.ActionForward.Wxid != "" {
  316. forwardWxids := splitString(stage.ActionForward.Wxid)
  317. for _, forwardWxid := range forwardWxids {
  318. for i, message := range stage.ActionForward.Action {
  319. meta := custom_types.Meta{}
  320. if message.Meta != nil {
  321. meta.Filename = message.Meta.Filename
  322. }
  323. _, err = l.svcCtx.DB.MessageRecords.Create().
  324. SetBotWxid(contact.WxWxid).
  325. SetContactID(0).
  326. SetContactType(0).
  327. SetContactWxid(forwardWxid).
  328. SetContentType(message.Type).
  329. SetContent(message.Content).
  330. SetMeta(meta).
  331. SetSourceType(sourceType).
  332. SetSourceID(stage.ID).
  333. SetSubSourceID(contact.ID + uint64(i)).
  334. SetOrganizationID(organizationId).
  335. Save(l.ctx)
  336. }
  337. }
  338. }
  339. }
  340. if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
  341. // 递归调用 AddLabelRelationships
  342. visitedStages[key] = true
  343. err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId, visitedStages)
  344. if err != nil {
  345. //_ = tx.Rollback()
  346. return err
  347. }
  348. }
  349. }
  350. }
  351. // 所有操作成功,提交事务
  352. //err = tx.Commit()
  353. //if err != nil {
  354. // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
  355. //}
  356. return nil
  357. }
  358. func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
  359. labelIdSet := make(map[uint64]struct{})
  360. for _, id := range labelIdList {
  361. labelIdSet[id] = struct{}{}
  362. }
  363. for _, condition := range conditionList {
  364. match := false
  365. for _, id := range condition.LabelIdList {
  366. if _, ok := labelIdSet[id]; ok {
  367. match = true
  368. break
  369. }
  370. }
  371. if condition.Equal == 2 {
  372. match = !match
  373. }
  374. if conditionOperator == 1 && !match {
  375. return false
  376. } else if conditionOperator == 2 && match {
  377. return true
  378. }
  379. }
  380. return conditionOperator == 1
  381. }