package crontask import ( "context" "regexp" "time" "wechat-api/ent" "wechat-api/ent/contact" "wechat-api/ent/custom_types" "wechat-api/ent/labelrelationship" "wechat-api/ent/messagerecords" "wechat-api/ent/sopnode" "wechat-api/ent/sopstage" "wechat-api/ent/soptask" "wechat-api/internal/utils/dberrorhandler" ) func (l *CronTask) sendWxOnTimeout() { ctx := context.Background() startTime := time.Now() sopTask, _ := l.svcCtx.DB.SopTask.Query(). Where( soptask.StatusEQ(3), soptask.DeletedAtIsNil(), ). All(l.ctx) sopTasks := make([]uint64, 0) for _, st := range sopTask { sopTasks = append(sopTasks, st.ID) } sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx) stageMap := make(map[uint64]*ent.SopStage) for _, stage := range sopStages { stageMap[stage.ID] = stage } // 查询所有 no_reply_condition 值非 0 的 sop_node 记录 nodes, err := l.svcCtx.DB.SopNode.Query(). Where(sopnode.NoReplyConditionNEQ(0)). Where(sopnode.HasSopStageWith( sopstage.StatusEQ(1), sopstage.DeletedAtIsNil(), sopstage.HasSopTaskWith( soptask.StatusEQ(3), soptask.DeletedAtIsNil(), ), )). All(ctx) if err != nil { l.Logger.Errorf("get node list failed %v", err) return } // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中 //parentNodes := make([]uint64, 0) //stages := make([]uint64, 0) messages := make([]*ent.MessageRecords, 0) for _, node := range nodes { var coef uint64 = 1 switch node.NoReplyUnit { case "W": coef = 60 * 24 * 7 case "D": coef = 60 * 24 case "h": coef = 60 } // 查询 node 对应的 stage 记录 lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2)) upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef)) if node.ParentID == 0 { messages, _ = l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.StatusEQ(3)). Where(messagerecords.SourceTypeEQ(3)). Where(messagerecords.SourceIDEQ(node.StageID)). Where(messagerecords.SubSourceIDEQ(0)). Where(messagerecords.SendTimeGTE(lowerBound)). Where(messagerecords.SendTimeLTE(upperBound)). All(ctx) } else { messages, _ = l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.StatusEQ(3)). Where(messagerecords.SourceTypeEQ(4)). Where(messagerecords.SourceIDIn(node.ParentID)). Where(messagerecords.SubSourceIDEQ(0)). Where(messagerecords.SendTimeGTE(lowerBound)). Where(messagerecords.SendTimeLTE(upperBound)). All(ctx) } for _, s := range messages { // 判断 s.Id 是否是 s.ContactID 的最新记录 latest, _ := l.svcCtx.DB.MessageRecords.Query(). Where(messagerecords.ContactIDEQ(s.ContactID)). Where(messagerecords.StatusEQ(3)). Order(ent.Desc(messagerecords.FieldID)). First(ctx) if latest.ID == s.ID { // 创建 MessageRecords 记录 if node.ActionMessage != nil { for i, c := range node.ActionMessage { meta := custom_types.Meta{} if c.Meta != nil { meta.Filename = c.Meta.Filename } _, _ = l.svcCtx.DB.MessageRecords.Create(). SetStatus(1). SetBotWxid(s.BotWxid). SetContactID(s.ContactID). SetContactType(s.ContactType). SetContactWxid(s.ContactWxid). SetContentType(c.Type). SetContent(c.Content). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(uint64(i)). SetOrganizationID(s.OrganizationID). Save(ctx) } } else { meta := custom_types.Meta{} _, _ = l.svcCtx.DB.MessageRecords.Create(). SetStatus(1). SetBotWxid(s.BotWxid). SetContactID(s.ContactID). SetContactType(s.ContactType). SetContactWxid(s.ContactWxid). SetContentType(1). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(0). SetOrganizationID(s.OrganizationID). Save(ctx) } if node.ActionForward != nil { if node.ActionForward.Wxid != "" { forwardWxids := splitString(node.ActionForward.Wxid) for _, forwardWxid := range forwardWxids { for i, message := range node.ActionForward.Action { meta := custom_types.Meta{} if message.Meta != nil { meta.Filename = message.Meta.Filename } contactInfo, _ := l.svcCtx.DB.Contact.Query().Where( contact.WxWxidEQ(s.BotWxid), contact.WxidEQ(s.ContactWxid), contact.CtypeIn(1, 3), ).First(l.ctx) content := varReplace(message.Content, contactInfo) _, _ = l.svcCtx.DB.MessageRecords.Create(). SetBotWxid(s.BotWxid). SetContactID(0). SetContactType(0). SetContactWxid(forwardWxid). SetContentType(message.Type). SetContent(content). SetMeta(meta). SetSourceType(4). SetSourceID(node.ID). SetSubSourceID(s.ContactID + uint64(i)). SetOrganizationID(s.OrganizationID). Save(l.ctx) } } } } // 查询当前联系人的标签关系 currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx) if currentLabelRelationships == nil { continue } // 提取当前标签ID var currentLabelIds []uint64 for _, relationship := range currentLabelRelationships { currentLabelIds = append(currentLabelIds, relationship.LabelID) } contact := &ent.Contact{ ID: s.ContactID, Type: s.ContentType, WxWxid: s.BotWxid, Wxid: s.ContactWxid, } if node.ActionLabelAdd != nil || node.ActionLabelDel != nil { _ = l.AddLabelRelationships(stageMap, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID) } } } } finishTime := time.Now() l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String()) return } func splitString(input string) []string { // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma pattern := `[,,、]` re := regexp.MustCompile(pattern) // Split the input string based on the pattern return re.Split(input, -1) } func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64) (err error) { //// 开始事务 //tx, err := l.svcCtx.DB.Tx(context.Background()) //if err != nil { // return dberrorhandler.DefaultEntError(l.Logger, err, nil) //} // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID var newLabelIds []uint64 var remLabelIds []uint64 var finalLabelIds []uint64 // 创建一个映射,用于快速查找 currentLabelIds 中的元素 currentLabelIdSet := make(map[uint64]struct{}) for _, id := range currentLabelIds { currentLabelIdSet[id] = struct{}{} } delLabelIdSet := make(map[uint64]struct{}) for _, id := range delLabelIds { delLabelIdSet[id] = struct{}{} } if addLabelIds != nil { // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素 for _, id := range addLabelIds { if _, ce := currentLabelIdSet[id]; !ce { if _, re := delLabelIdSet[id]; !re { newLabelIds = append(newLabelIds, id) } } } if len(newLabelIds) > 0 { // 创建需要新增的标签关系 for _, id := range newLabelIds { currentLabelIdSet[id] = struct{}{} _, err = l.svcCtx.DB.LabelRelationship.Create(). SetLabelID(id). SetContactID(contact.ID). SetOrganizationID(organizationId). Save(l.ctx) if err != nil { //_ = tx.Rollback() return dberrorhandler.DefaultEntError(l.Logger, err, nil) } } // 合并 currentLabelIds 和 newLabelIds currentLabelIds = append(currentLabelIds, newLabelIds...) } } if delLabelIds != nil { // 遍历 delLabelIds,找出在 currentLabelIds 中的元素 for _, id := range delLabelIds { if _, exists := currentLabelIdSet[id]; exists { remLabelIds = append(remLabelIds, id) delete(currentLabelIdSet, id) } } if len(remLabelIds) > 0 { _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx) if err != nil { //_ = tx.Rollback() return dberrorhandler.DefaultEntError(l.Logger, err, nil) } } } if len(newLabelIds) == 0 && len(remLabelIds) == 0 { return nil } for id := range currentLabelIdSet { finalLabelIds = append(finalLabelIds, id) } // 遍历 sop_stages,找出满足条件的 stage for key, stage := range sopStages { if stage != nil && stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) { // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录 _, err := l.svcCtx.DB.MessageRecords.Query(). Where( messagerecords.ContactWxid(contact.Wxid), messagerecords.SourceType(3), messagerecords.SourceID(stage.ID), messagerecords.SubSourceID(0), ). Only(l.ctx) if !ent.IsNotFound(err) { continue } // 判断ActionMessage是否为空 sourceType := 3 if stage.ActionMessage != nil { for i, message := range stage.ActionMessage { meta := custom_types.Meta{} if message.Meta != nil { meta.Filename = message.Meta.Filename } _, _ = l.svcCtx.DB.MessageRecords.Create(). SetNotNilBotWxid(&contact.WxWxid). SetNotNilContactID(&contact.ID). SetNotNilContactType(&contact.Type). SetNotNilContactWxid(&contact.Wxid). SetNotNilContentType(&message.Type). SetNotNilContent(&message.Content). SetMeta(meta). SetNotNilSourceType(&sourceType). SetNotNilSourceID(&stage.ID). SetSubSourceID(uint64(i)). SetOrganizationID(organizationId). Save(l.ctx) //if err != nil { // return dberrorhandler.DefaultEntError(l.Logger, err, nil) //} } } if stage.ActionForward != nil { if stage.ActionForward.Wxid != "" { forwardWxids := splitString(stage.ActionForward.Wxid) for _, forwardWxid := range forwardWxids { for i, message := range stage.ActionForward.Action { meta := custom_types.Meta{} if message.Meta != nil { meta.Filename = message.Meta.Filename } _, err = l.svcCtx.DB.MessageRecords.Create(). SetBotWxid(contact.WxWxid). SetContactID(0). SetContactType(0). SetContactWxid(forwardWxid). SetContentType(message.Type). SetContent(message.Content). SetMeta(meta). SetSourceType(sourceType). SetSourceID(stage.ID). SetSubSourceID(contact.ID + uint64(i)). SetOrganizationID(organizationId). Save(l.ctx) } } } } if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil { // 递归调用 AddLabelRelationships sopStages[key] = nil err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId) if err != nil { //_ = tx.Rollback() return err } } } } // 所有操作成功,提交事务 //err = tx.Commit() //if err != nil { // return dberrorhandler.DefaultEntError(l.Logger, err, nil) //} return nil } func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool { labelIdSet := make(map[uint64]struct{}) for _, id := range labelIdList { labelIdSet[id] = struct{}{} } for _, condition := range conditionList { match := false for _, id := range condition.LabelIdList { if _, ok := labelIdSet[id]; ok { match = true break } } if condition.Equal == 2 { match = !match } if conditionOperator == 1 && !match { return false } else if conditionOperator == 2 && match { return true } } return conditionOperator == 1 }