package crontask import ( "context" "regexp" "time" "wechat-api/ent" "wechat-api/ent/custom_types" "wechat-api/ent/messagerecords" "wechat-api/ent/sopnode" "wechat-api/ent/sopstage" "wechat-api/ent/soptask" ) func (l *CronTask) sendWxOnTimeout() { ctx := context.Background() startTime := time.Now() // 查询所有 no_reply_condition 值非 0 的 sop_node 记录 nodes, err := l.svcCtx.DB.SopNode.Query(). Where(sopnode.NoReplyConditionNEQ(0)). Where(sopnode.HasSopStageWith( sopstage.StatusEQ(1), sopstage.DeletedAtIsNil(), sopstage.HasSopTaskWith( soptask.StatusEQ(3), soptask.DeletedAtIsNil(), ), )). All(ctx) if err != nil { l.Logger.Errorf("get node list failed %v", err) return } // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中 //parentNodes := make([]uint64, 0) //stages := make([]uint64, 0) messages := make([]*ent.MessageRecords, 0) for _, node := range nodes { var coef uint64 = 1 switch node.NoReplyUnit { case "W": coef = 60 * 24 * 7 case "D": coef = 60 * 24 case "h": coef = 60 } // 查询 node 对应的 stage 记录 lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2)) upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef)) if node.ParentID == 0 { messages, _ = l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.StatusEQ(3)). Where(messagerecords.SourceTypeEQ(3)). Where(messagerecords.SourceIDEQ(node.StageID)). Where(messagerecords.SubSourceIDEQ(0)). Where(messagerecords.SendTimeGTE(lowerBound)). Where(messagerecords.SendTimeLTE(upperBound)). All(ctx) } else { messages, _ = l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.StatusEQ(3)). Where(messagerecords.SourceTypeEQ(4)). Where(messagerecords.SourceIDIn(node.ParentID)). Where(messagerecords.SubSourceIDEQ(0)). Where(messagerecords.SendTimeGTE(lowerBound)). Where(messagerecords.SendTimeLTE(upperBound)). All(ctx) } for _, s := range messages { // 判断 s.Id 是否是 s.ContactID 的最新记录 latest, _ := l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.ContactIDEQ(s.ContactID)). Where(messagerecords.StatusEQ(3)). Order(ent.Desc(messagerecords.FieldCreatedAt)). First(ctx) if latest.ID == s.ID { // 创建 MessageRecords 记录 if node.ActionMessage != nil { for i, c := range node.ActionMessage { meta := custom_types.Meta{} if c.Meta != nil { meta.Filename = c.Meta.Filename } _, _ = l.svcCtx.DB.MessageRecords.Create(). SetStatus(1). SetBotWxid(s.BotWxid). SetContactID(s.ContactID). SetContactType(s.ContactType). SetContactWxid(s.ContactWxid). SetContentType(c.Type). SetContent(c.Content). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(uint64(i)). SetOrganizationID(s.OrganizationID). Save(ctx) } } else { meta := custom_types.Meta{} _, _ = l.svcCtx.DB.MessageRecords.Create(). SetStatus(1). SetBotWxid(s.BotWxid). SetContactID(s.ContactID). SetContactType(s.ContactType). SetContactWxid(s.ContactWxid). SetContentType(1). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(0). SetOrganizationID(s.OrganizationID). Save(ctx) } if node.ActionForward != nil { if node.ActionForward.Wxid != "" { forwardWxids := splitString(node.ActionForward.Wxid) for _, forwardWxid := range forwardWxids { for i, message := range node.ActionForward.Action { meta := custom_types.Meta{} if message.Meta != nil { meta.Filename = message.Meta.Filename } _, _ = l.svcCtx.DB.MessageRecords.Create(). SetBotWxid(s.BotWxid). SetContactID(0). SetContactType(0). SetContactWxid(forwardWxid). SetContentType(message.Type). SetContent(message.Content). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(s.ContactID + uint64(i)). SetOrganizationID(s.OrganizationID). Save(l.ctx) } } } } } } } finishTime := time.Now() l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String()) return } func splitString(input string) []string { // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma pattern := `[,,、]` re := regexp.MustCompile(pattern) // Split the input string based on the pattern return re.Split(input, -1) }