|
@@ -6,6 +6,7 @@ import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"github.com/suyuan32/simple-admin-common/i18n"
|
|
|
+ "github.com/suyuan32/simple-admin-job/ent"
|
|
|
"github.com/suyuan32/simple-admin-job/ent/custom_types"
|
|
|
"github.com/suyuan32/simple-admin-job/ent/messagerecords"
|
|
|
"github.com/suyuan32/simple-admin-job/ent/sopnode"
|
|
@@ -71,11 +72,12 @@ func (l *SendWxOnTimeoutHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
|
|
// 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
|
|
|
//parentNodes := make([]uint64, 0)
|
|
|
//stages := make([]uint64, 0)
|
|
|
+ messages := make([]*ent.MessageRecords, 0)
|
|
|
for _, node := range nodes {
|
|
|
lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition+2))
|
|
|
upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition))
|
|
|
if node.ParentID == 0 {
|
|
|
- messagesStage, _ := l.svcCtx.WX_DB.MessageRecords.Query().
|
|
|
+ messages, _ = l.svcCtx.WX_DB.MessageRecords.Query().
|
|
|
Where(messagerecords.StatusEQ(3)).
|
|
|
Where(messagerecords.SourceTypeEQ(3)).
|
|
|
Where(messagerecords.SourceIDEQ(node.StageID)).
|
|
@@ -83,30 +85,8 @@ func (l *SendWxOnTimeoutHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
|
|
Where(messagerecords.SendTimeGTE(lowerBound)).
|
|
|
Where(messagerecords.SendTimeLTE(upperBound)).
|
|
|
All(ctx)
|
|
|
- for _, s := range messagesStage {
|
|
|
- // 创建 MessageRecords 记录
|
|
|
- for i1, c1 := range node.ActionMessage {
|
|
|
- meta := custom_types.Meta{}
|
|
|
- if c1.Meta != nil {
|
|
|
- meta.Filename = c1.Meta.Filename
|
|
|
- }
|
|
|
- _, _ = l.svcCtx.WX_DB.MessageRecords.Create().
|
|
|
- SetStatus(1).
|
|
|
- SetBotWxid(s.BotWxid).
|
|
|
- SetContactID(s.ContactID).
|
|
|
- SetContactType(s.ContactType).
|
|
|
- SetContactWxid(s.ContactWxid).
|
|
|
- SetContentType(c1.Type).
|
|
|
- SetContent(c1.Content).
|
|
|
- SetMeta(meta).
|
|
|
- SetSourceType(4).
|
|
|
- SetSourceID(node.ID).
|
|
|
- SetSubSourceID(uint64(i1)).
|
|
|
- Save(ctx)
|
|
|
- }
|
|
|
- }
|
|
|
} else {
|
|
|
- messagesNode, _ := l.svcCtx.WX_DB.MessageRecords.Query().
|
|
|
+ messages, _ = l.svcCtx.WX_DB.MessageRecords.Query().
|
|
|
Where(messagerecords.StatusEQ(3)).
|
|
|
Where(messagerecords.SourceTypeEQ(4)).
|
|
|
Where(messagerecords.SourceIDIn(node.ParentID)).
|
|
@@ -114,26 +94,34 @@ func (l *SendWxOnTimeoutHandler) ProcessTask(ctx context.Context, t *asynq.Task)
|
|
|
Where(messagerecords.SendTimeGTE(lowerBound)).
|
|
|
Where(messagerecords.SendTimeLTE(upperBound)).
|
|
|
All(ctx)
|
|
|
+ }
|
|
|
+ for _, s := range messages {
|
|
|
+ // 判断 s.Id 是否是 s.ContactID 的最新记录
|
|
|
+ latest, _ := l.svcCtx.WX_DB.MessageRecords.Query().
|
|
|
+ Where(messagerecords.ContactIDEQ(s.ContactID)).
|
|
|
+ Where(messagerecords.StatusEQ(3)).
|
|
|
+ Order(ent.Desc(messagerecords.FieldCreatedAt)).
|
|
|
+ First(ctx)
|
|
|
|
|
|
- for _, n := range messagesNode {
|
|
|
+ if latest.ID == s.ID {
|
|
|
// 创建 MessageRecords 记录
|
|
|
- for i2, c2 := range node.ActionMessage {
|
|
|
+ for i, c := range node.ActionMessage {
|
|
|
meta := custom_types.Meta{}
|
|
|
- if c2.Meta != nil {
|
|
|
- meta.Filename = c2.Meta.Filename
|
|
|
+ if c.Meta != nil {
|
|
|
+ meta.Filename = c.Meta.Filename
|
|
|
}
|
|
|
_, _ = l.svcCtx.WX_DB.MessageRecords.Create().
|
|
|
SetStatus(1).
|
|
|
- SetBotWxid(n.BotWxid).
|
|
|
- SetContactID(n.ContactID).
|
|
|
- SetContactType(n.ContactType).
|
|
|
- SetContactWxid(n.ContactWxid).
|
|
|
- SetContentType(c2.Type).
|
|
|
- SetContent(c2.Content).
|
|
|
+ SetBotWxid(s.BotWxid).
|
|
|
+ SetContactID(s.ContactID).
|
|
|
+ SetContactType(s.ContactType).
|
|
|
+ SetContactWxid(s.ContactWxid).
|
|
|
+ SetContentType(c.Type).
|
|
|
+ SetContent(c.Content).
|
|
|
SetMeta(meta).
|
|
|
SetSourceType(4).
|
|
|
SetSourceID(node.ID).
|
|
|
- SetSubSourceID(uint64(i2)).
|
|
|
+ SetSubSourceID(uint64(i)).
|
|
|
Save(ctx)
|
|
|
}
|
|
|
}
|