123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- package crontask
- import (
- "context"
- "regexp"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/contact"
- "wechat-api/ent/custom_types"
- "wechat-api/ent/labelrelationship"
- "wechat-api/ent/messagerecords"
- "wechat-api/ent/sopnode"
- "wechat-api/ent/sopstage"
- "wechat-api/ent/soptask"
- "wechat-api/internal/utils/dberrorhandler"
- )
- func (l *CronTask) sendWxOnTimeout() {
- ctx := context.Background()
- startTime := time.Now()
- sopTask, _ := l.svcCtx.DB.SopTask.Query().
- Where(
- soptask.StatusEQ(3),
- soptask.DeletedAtIsNil(),
- ).
- All(l.ctx)
- sopTasks := make([]uint64, 0)
- for _, st := range sopTask {
- sopTasks = append(sopTasks, st.ID)
- }
- sopStages, _ := l.svcCtx.DB.SopStage.Query().Where(sopstage.TaskIDIn(sopTasks...), sopstage.DeletedAtIsNil()).All(l.ctx)
- stageMap := make(map[uint64]*ent.SopStage)
- for _, stage := range sopStages {
- stageMap[stage.ID] = stage
- }
- // 查询所有 no_reply_condition 值非 0 的 sop_node 记录
- nodes, err := l.svcCtx.DB.SopNode.Query().
- Where(sopnode.NoReplyConditionNEQ(0)).
- Where(sopnode.HasSopStageWith(
- sopstage.StatusEQ(1),
- sopstage.DeletedAtIsNil(),
- sopstage.HasSopTaskWith(
- soptask.StatusEQ(3),
- soptask.DeletedAtIsNil(),
- ),
- )).
- All(ctx)
- if err != nil {
- l.Logger.Errorf("get node list failed %v", err)
- return
- }
- // 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
- //parentNodes := make([]uint64, 0)
- //stages := make([]uint64, 0)
- messages := make([]*ent.MessageRecords, 0)
- for _, node := range nodes {
- var coef uint64 = 1
- switch node.NoReplyUnit {
- case "W":
- coef = 60 * 24 * 7
- case "D":
- coef = 60 * 24
- case "h":
- coef = 60
- }
- // 查询 node 对应的 stage 记录
- lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef+2))
- upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition*coef))
- if node.ParentID == 0 {
- messages, _ = l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.StatusEQ(3)).
- Where(messagerecords.SourceTypeEQ(3)).
- Where(messagerecords.SourceIDEQ(node.StageID)).
- Where(messagerecords.SubSourceIDEQ(0)).
- Where(messagerecords.SendTimeGTE(lowerBound)).
- Where(messagerecords.SendTimeLTE(upperBound)).
- All(ctx)
- } else {
- messages, _ = l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.StatusEQ(3)).
- Where(messagerecords.SourceTypeEQ(4)).
- Where(messagerecords.SourceIDIn(node.ParentID)).
- Where(messagerecords.SubSourceIDEQ(0)).
- Where(messagerecords.SendTimeGTE(lowerBound)).
- Where(messagerecords.SendTimeLTE(upperBound)).
- All(ctx)
- }
- for _, s := range messages {
- // 判断 s.Id 是否是 s.ContactID 的最新记录
- latest, _ := l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.ContactIDEQ(s.ContactID)).
- Where(messagerecords.StatusEQ(3)).
- Order(ent.Desc(messagerecords.FieldID)).
- First(ctx)
- if latest.ID == s.ID {
- // 创建 MessageRecords 记录
- if node.ActionMessage != nil {
- for i, c := range node.ActionMessage {
- meta := custom_types.Meta{}
- if c.Meta != nil {
- meta.Filename = c.Meta.Filename
- }
- _, _ = l.svcCtx.DB.MessageRecords.Create().
- SetStatus(1).
- SetBotWxid(s.BotWxid).
- SetContactID(s.ContactID).
- SetContactType(s.ContactType).
- SetContactWxid(s.ContactWxid).
- SetContentType(c.Type).
- SetContent(c.Content).
- SetMeta(meta).
- SetSourceType(4).
- SetSourceID(node.ID).
- SetSubSourceID(uint64(i)).
- SetOrganizationID(s.OrganizationID).
- Save(ctx)
- }
- } else {
- meta := custom_types.Meta{}
- _, _ = l.svcCtx.DB.MessageRecords.Create().
- SetStatus(1).
- SetBotWxid(s.BotWxid).
- SetContactID(s.ContactID).
- SetContactType(s.ContactType).
- SetContactWxid(s.ContactWxid).
- SetContentType(1).
- SetMeta(meta).
- SetSourceType(4).
- SetSourceID(node.ID).
- SetSubSourceID(0).
- SetOrganizationID(s.OrganizationID).
- Save(ctx)
- }
- if node.ActionForward != nil {
- if node.ActionForward.Wxid != "" {
- forwardWxids := splitString(node.ActionForward.Wxid)
- for _, forwardWxid := range forwardWxids {
- for i, message := range node.ActionForward.Action {
- meta := custom_types.Meta{}
- if message.Meta != nil {
- meta.Filename = message.Meta.Filename
- }
- contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxidEQ(s.BotWxid),
- contact.WxidEQ(s.ContactWxid),
- contact.Ctype(1),
- ).First(l.ctx)
- content := varReplace(message.Content, contactInfo)
- _, _ = l.svcCtx.DB.MessageRecords.Create().
- SetBotWxid(s.BotWxid).
- SetContactID(0).
- SetContactType(0).
- SetContactWxid(forwardWxid).
- SetContentType(message.Type).
- SetContent(content).
- SetMeta(meta).
- SetSourceType(4).
- SetSourceID(node.ID).
- SetSubSourceID(s.ContactID + uint64(i)).
- SetOrganizationID(s.OrganizationID).
- Save(l.ctx)
- }
- }
- }
- }
- // 查询当前联系人的标签关系
- currentLabelRelationships, _ := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.ContactID(s.ContactID)).All(l.ctx)
- if currentLabelRelationships == nil {
- continue
- }
- // 提取当前标签ID
- var currentLabelIds []uint64
- for _, relationship := range currentLabelRelationships {
- currentLabelIds = append(currentLabelIds, relationship.LabelID)
- }
- contact := &ent.Contact{
- ID: s.ContactID,
- Type: s.ContentType,
- WxWxid: s.BotWxid,
- Wxid: s.ContactWxid,
- }
- if node.ActionLabelAdd != nil || node.ActionLabelDel != nil {
- _ = l.AddLabelRelationships(stageMap, *contact, currentLabelIds, node.ActionLabelAdd, node.ActionLabelDel, s.OrganizationID)
- }
- }
- }
- }
- finishTime := time.Now()
- l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
- return
- }
- func splitString(input string) []string {
- // Define the regular expression pattern to match Chinese comma, English comma, and Chinese enumeration comma
- pattern := `[,,、]`
- re := regexp.MustCompile(pattern)
- // Split the input string based on the pattern
- return re.Split(input, -1)
- }
- func (l *CronTask) AddLabelRelationships(sopStages map[uint64]*ent.SopStage, contact ent.Contact, currentLabelIds []uint64, addLabelIds []uint64, delLabelIds []uint64, organizationId uint64) (err error) {
- //// 开始事务
- //tx, err := l.svcCtx.DB.Tx(context.Background())
- //if err != nil {
- // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
- //}
- // 获取 addLabelIds 中不在 currentLabelIds 中的标签ID
- var newLabelIds []uint64
- var remLabelIds []uint64
- var finalLabelIds []uint64
- // 创建一个映射,用于快速查找 currentLabelIds 中的元素
- currentLabelIdSet := make(map[uint64]struct{})
- for _, id := range currentLabelIds {
- currentLabelIdSet[id] = struct{}{}
- }
- delLabelIdSet := make(map[uint64]struct{})
- for _, id := range delLabelIds {
- delLabelIdSet[id] = struct{}{}
- }
- if addLabelIds != nil {
- // 遍历 addLabelIds,找出不在 currentLabelIds 中的元素
- for _, id := range addLabelIds {
- if _, ce := currentLabelIdSet[id]; !ce {
- if _, re := delLabelIdSet[id]; !re {
- newLabelIds = append(newLabelIds, id)
- }
- }
- }
- if len(newLabelIds) > 0 {
- // 创建需要新增的标签关系
- for _, id := range newLabelIds {
- currentLabelIdSet[id] = struct{}{}
- _, err = l.svcCtx.DB.LabelRelationship.Create().
- SetLabelID(id).
- SetContactID(contact.ID).
- SetOrganizationID(organizationId).
- Save(l.ctx)
- if err != nil {
- //_ = tx.Rollback()
- return dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- }
- // 合并 currentLabelIds 和 newLabelIds
- currentLabelIds = append(currentLabelIds, newLabelIds...)
- }
- }
- if delLabelIds != nil {
- // 遍历 delLabelIds,找出在 currentLabelIds 中的元素
- for _, id := range delLabelIds {
- if _, exists := currentLabelIdSet[id]; exists {
- remLabelIds = append(remLabelIds, id)
- delete(currentLabelIdSet, id)
- }
- }
- if len(remLabelIds) > 0 {
- _, err = l.svcCtx.DB.LabelRelationship.Delete().Where(labelrelationship.LabelIDIn(remLabelIds...), labelrelationship.ContactIDEQ(contact.ID), labelrelationship.OrganizationIDEQ(organizationId)).Exec(l.ctx)
- if err != nil {
- //_ = tx.Rollback()
- return dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- }
- }
- if len(newLabelIds) == 0 && len(remLabelIds) == 0 {
- return nil
- }
- for id := range currentLabelIdSet {
- finalLabelIds = append(finalLabelIds, id)
- }
- // 遍历 sop_stages,找出满足条件的 stage
- for key, stage := range sopStages {
- if stage != nil && stage.ConditionType == 1 && isLabelIdListMatchFilter(finalLabelIds, stage.ConditionOperator, stage.ConditionList) {
- // 判断是否有 contact_wxid、source_type、source_id、sub_source_id 相同的记录
- _, err := l.svcCtx.DB.MessageRecords.Query().
- Where(
- messagerecords.ContactWxid(contact.Wxid),
- messagerecords.SourceType(3),
- messagerecords.SourceID(stage.ID),
- messagerecords.SubSourceID(0),
- ).
- Only(l.ctx)
- if !ent.IsNotFound(err) {
- continue
- }
- // 判断ActionMessage是否为空
- sourceType := 3
- if stage.ActionMessage != nil {
- for i, message := range stage.ActionMessage {
- meta := custom_types.Meta{}
- if message.Meta != nil {
- meta.Filename = message.Meta.Filename
- }
- _, _ = l.svcCtx.DB.MessageRecords.Create().
- SetNotNilBotWxid(&contact.WxWxid).
- SetNotNilContactID(&contact.ID).
- SetNotNilContactType(&contact.Type).
- SetNotNilContactWxid(&contact.Wxid).
- SetNotNilContentType(&message.Type).
- SetNotNilContent(&message.Content).
- SetMeta(meta).
- SetNotNilSourceType(&sourceType).
- SetNotNilSourceID(&stage.ID).
- SetSubSourceID(uint64(i)).
- SetOrganizationID(organizationId).
- Save(l.ctx)
- //if err != nil {
- // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
- //}
- }
- }
- if stage.ActionForward != nil {
- if stage.ActionForward.Wxid != "" {
- forwardWxids := splitString(stage.ActionForward.Wxid)
- for _, forwardWxid := range forwardWxids {
- for i, message := range stage.ActionForward.Action {
- meta := custom_types.Meta{}
- if message.Meta != nil {
- meta.Filename = message.Meta.Filename
- }
- _, err = l.svcCtx.DB.MessageRecords.Create().
- SetBotWxid(contact.WxWxid).
- SetContactID(0).
- SetContactType(0).
- SetContactWxid(forwardWxid).
- SetContentType(message.Type).
- SetContent(message.Content).
- SetMeta(meta).
- SetSourceType(sourceType).
- SetSourceID(stage.ID).
- SetSubSourceID(contact.ID + uint64(i)).
- SetOrganizationID(organizationId).
- Save(l.ctx)
- }
- }
- }
- }
- if stage.ActionLabelAdd != nil || stage.ActionLabelDel != nil {
- // 递归调用 AddLabelRelationships
- sopStages[key] = nil
- err = l.AddLabelRelationships(sopStages, contact, finalLabelIds, stage.ActionLabelAdd, stage.ActionLabelDel, organizationId)
- if err != nil {
- //_ = tx.Rollback()
- return err
- }
- }
- }
- }
- // 所有操作成功,提交事务
- //err = tx.Commit()
- //if err != nil {
- // return dberrorhandler.DefaultEntError(l.Logger, err, nil)
- //}
- return nil
- }
- func isLabelIdListMatchFilter(labelIdList []uint64, conditionOperator int, conditionList []custom_types.Condition) bool {
- labelIdSet := make(map[uint64]struct{})
- for _, id := range labelIdList {
- labelIdSet[id] = struct{}{}
- }
- for _, condition := range conditionList {
- match := false
- for _, id := range condition.LabelIdList {
- if _, ok := labelIdSet[id]; ok {
- match = true
- break
- }
- }
- if condition.Equal == 2 {
- match = !match
- }
- if conditionOperator == 1 && !match {
- return false
- } else if conditionOperator == 2 && match {
- return true
- }
- }
- return conditionOperator == 1
- }
|