package MessageHandlers import ( "context" "encoding/json" "entgo.io/ent/dialect/sql" "fmt" "github.com/zeromicro/go-zero/core/logx" "strconv" "time" "wechat-api/ent" "wechat-api/ent/label" "wechat-api/ent/labellog" "wechat-api/ent/wx" "wechat-api/internal/pkg/wechat_ws" "wechat-api/internal/svc" "wechat-api/workphone" ) type ContactLabelInfoNotice struct { svcCtx *svc.ServiceContext } func NewContactLabelInfoNotice(svcCtx *svc.ServiceContext) *ContactLabelInfoNotice { return &ContactLabelInfoNotice{ svcCtx: svcCtx, } } // Handle 实现 MessageHandlerStrategy 接口 func (f *ContactLabelInfoNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error { var message workphone.ContactLabelInfoNoticeMessage logx.Infof("msg.Message 的内容是:%s", msg.Message) if err := json.Unmarshal([]byte(msg.Message), &message); err != nil { return err } wxInfo, err := svcCtx.DB.Wx.Query(). Where(wx.WxidEQ(message.WeChatId)). Only(ctx) if err != nil { return err } // 整理标签ID和名称列表 labelIDs := make([]int, 0, len(message.Labels)) labelNameSet := make(map[string]struct{}) // 用于去重 for _, labelInfoMessage := range message.Labels { labelIDs = append(labelIDs, int(labelInfoMessage.LabelId)) labelNameSet[labelInfoMessage.LabelName] = struct{}{} } // 提前查询现有 LabelLog 和 Label existingLabelLogs, err := svcCtx.DB.LabelLog.Query(). Where( labellog.LabelIDIn(labelIDs...), labellog.OrganizationID(wxInfo.OrganizationID), labellog.WxID(message.WeChatId), ). Select(labellog.FieldLabelID). All(ctx) if err != nil { return fmt.Errorf("查询 LabelLog 失败: %w", err) } existingLabels, err := svcCtx.DB.Label.Query(). Where( label.NameIn(keysFromMap(labelNameSet)...), label.OrganizationID(wxInfo.OrganizationID), ). Select(label.FieldName). All(ctx) if err != nil { return fmt.Errorf("查询 Label 失败: %w", err) } existingLabelLogMap := make(map[int]struct{}) for _, log := range existingLabelLogs { existingLabelLogMap[log.LabelID] = struct{}{} } existingLabelMap := make(map[string]struct{}) for _, lab := range existingLabels { existingLabelMap[lab.Name] = struct{}{} } var bulkLabelLogs []*ent.LabelLogCreate var bulkLabels []*ent.LabelCreate for _, labelWx := range message.Labels { // 时间戳处理 tsInt, err := strconv.ParseInt(labelWx.CreateTime, 10, 64) if err != nil { logx.Errorf("时间戳转换失败: %v (Label ID: %d)", err, labelWx.LabelId) continue } // 插入 LabelLog if _, exists := existingLabelLogMap[int(labelWx.LabelId)]; !exists { bulkLabelLogs = append(bulkLabelLogs, svcCtx.DB.LabelLog.Create(). SetLabelName(labelWx.LabelName). SetLabelID(int(labelWx.LabelId)). SetOrganizationID(wxInfo.OrganizationID). SetWxID(message.WeChatId). SetCreatedAt(time.Unix(tsInt/1000, 0)), ) } // 插入 Label if _, exists := existingLabelMap[labelWx.LabelName]; !exists { bulkLabels = append(bulkLabels, svcCtx.DB.Label.Create(). SetName(labelWx.LabelName). SetType(1). SetStatus(1). SetOrganizationID(wxInfo.OrganizationID). SetFrom(2). SetMode(1). SetConditions(`{}`). SetCreatedAt(time.Now()). SetUpdatedAt(time.Now()), ) } } // 批量插入 LabelLog if len(bulkLabelLogs) > 0 { err := svcCtx.DB.LabelLog.CreateBulk(bulkLabelLogs...). OnConflict(sql.ConflictColumns(labellog.FieldLabelID, labellog.FieldWxID, labellog.FieldOrganizationID)). DoNothing(). Exec(ctx) if err != nil { logx.Error("labelLog 批量插入失败", bulkLabelLogs) } } // 批量插入 Label if len(bulkLabels) > 0 { err := svcCtx.DB.Label.CreateBulk(bulkLabels...). OnConflict(sql.ConflictColumns(label.FieldName, label.FieldOrganizationID)). DoNothing(). Exec(ctx) if err != nil { logx.Error("label 批量插入失败", bulkLabels) return err } } return nil } func keysFromMap(m map[string]struct{}) []string { keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } return keys }