12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package crontask
- import (
- "context"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/custom_types"
- "wechat-api/ent/messagerecords"
- "wechat-api/ent/sopnode"
- "wechat-api/ent/sopstage"
- "wechat-api/ent/soptask"
- )
- func (l *CronTask) sendWxOnTimeout() {
- ctx := context.Background()
- startTime := time.Now()
- // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
- nodes, err := l.svcCtx.DB.SopNode.Query().
- Where(sopnode.NoReplyConditionNEQ(0)).
- Where(sopnode.HasSopStageWith(
- sopstage.StatusEQ(1),
- sopstage.DeletedAtIsNil(),
- sopstage.HasSopTaskWith(
- soptask.StatusEQ(3),
- soptask.DeletedAtIsNil(),
- ),
- )).
- All(ctx)
- if err != nil {
- l.Logger.Errorf("get node list failed %v", err)
- return
- }
- // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
- //parentNodes := make([]uint64, 0)
- //stages := make([]uint64, 0)
- messages := make([]*ent.MessageRecords, 0)
- for _, node := range nodes {
- lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition+2))
- upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition))
- if node.ParentID == 0 {
- messages, _ = l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.StatusEQ(3)).
- Where(messagerecords.SourceTypeEQ(3)).
- Where(messagerecords.SourceIDEQ(node.StageID)).
- Where(messagerecords.SubSourceIDEQ(0)).
- Where(messagerecords.SendTimeGTE(lowerBound)).
- Where(messagerecords.SendTimeLTE(upperBound)).
- All(ctx)
- } else {
- messages, _ = l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.StatusEQ(3)).
- Where(messagerecords.SourceTypeEQ(4)).
- Where(messagerecords.SourceIDIn(node.ParentID)).
- Where(messagerecords.SubSourceIDEQ(0)).
- Where(messagerecords.SendTimeGTE(lowerBound)).
- Where(messagerecords.SendTimeLTE(upperBound)).
- All(ctx)
- }
- for _, s := range messages {
- // 判断 s.Id 是否是 s.ContactID 的最新记录
- latest, _ := l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.ContactIDEQ(s.ContactID)).
- Where(messagerecords.StatusEQ(3)).
- Order(ent.Desc(messagerecords.FieldCreatedAt)).
- First(ctx)
- if latest.ID == s.ID {
- // 创建 MessageRecords 记录
- for i, c := range node.ActionMessage {
- meta := custom_types.Meta{}
- if c.Meta != nil {
- meta.Filename = c.Meta.Filename
- }
- _, _ = l.svcCtx.DB.MessageRecords.Create().
- SetStatus(1).
- SetBotWxid(s.BotWxid).
- SetContactID(s.ContactID).
- SetContactType(s.ContactType).
- SetContactWxid(s.ContactWxid).
- SetContentType(c.Type).
- SetContent(c.Content).
- SetMeta(meta).
- SetSourceType(4).
- SetSourceID(node.ID).
- SetSubSourceID(uint64(i)).
- Save(ctx)
- }
- }
- }
- }
- finishTime := time.Now()
- l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
- return
- }
|