|
@@ -4,10 +4,13 @@ import (
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"entgo.io/ent/dialect/sql"
|
|
|
+ "fmt"
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
"strconv"
|
|
|
"time"
|
|
|
"wechat-api/ent"
|
|
|
+ "wechat-api/ent/label"
|
|
|
+ "wechat-api/ent/labellog"
|
|
|
"wechat-api/ent/wx"
|
|
|
"wechat-api/internal/pkg/wechat_ws"
|
|
|
"wechat-api/internal/svc"
|
|
@@ -26,94 +29,129 @@ func NewContactLabelInfoNotice(svcCtx *svc.ServiceContext) *ContactLabelInfoNoti
|
|
|
|
|
|
// Handle 实现 MessageHandlerStrategy 接口
|
|
|
func (f *ContactLabelInfoNotice) Handle(ctx context.Context, msg *wechat_ws.MsgJsonObject, svcCtx *svc.ServiceContext) error {
|
|
|
- message := workphone.ContactLabelInfoNoticeMessage{}
|
|
|
+ var message workphone.ContactLabelInfoNoticeMessage
|
|
|
logx.Infof("msg.Message 的内容是:%s", msg.Message)
|
|
|
- err := json.Unmarshal([]byte(msg.Message), &message)
|
|
|
- if err != nil {
|
|
|
+ if err := json.Unmarshal([]byte(msg.Message), &message); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- // 拿到租户 id
|
|
|
+
|
|
|
wxInfo, err := svcCtx.DB.Wx.Query().
|
|
|
- Where(
|
|
|
- wx.WxidEQ(message.WeChatId), // Additional filter by organizationId
|
|
|
- ).
|
|
|
+ Where(wx.WxidEQ(message.WeChatId)).
|
|
|
Only(ctx)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- var bulkCreates []*ent.LabelLogCreate
|
|
|
- var labelCreates []*ent.LabelCreate
|
|
|
|
|
|
- for _, label := range message.Labels {
|
|
|
+ // 整理标签ID和名称列表
|
|
|
+ labelIDs := make([]int, 0, len(message.Labels))
|
|
|
+ labelNameSet := make(map[string]struct{}) // 用于去重
|
|
|
+ for _, labelInfoMessage := range message.Labels {
|
|
|
+ labelIDs = append(labelIDs, int(labelInfoMessage.LabelId))
|
|
|
+ labelNameSet[labelInfoMessage.LabelName] = struct{}{}
|
|
|
+ }
|
|
|
|
|
|
- labelIDSet := make(map[int]struct{})
|
|
|
- labelIDSet[int(label.LabelId)] = struct{}{}
|
|
|
- var labelIDs []int
|
|
|
- for id := range labelIDSet {
|
|
|
- labelIDs = append(labelIDs, id)
|
|
|
- }
|
|
|
- existingMap := make(map[int]struct{})
|
|
|
- // 只拼接还没插入过的 label_id + organization_id 组合
|
|
|
- if _, exists := existingMap[int(label.LabelId)]; exists {
|
|
|
- logx.Error("label_log---已经存在的: ", wxInfo.OrganizationID, label.LabelId)
|
|
|
- continue
|
|
|
- }
|
|
|
- tsInt, err := strconv.ParseInt(label.CreateTime, 10, 64)
|
|
|
+ // 提前查询现有 LabelLog 和 Label
|
|
|
+ existingLabelLogs, err := svcCtx.DB.LabelLog.Query().
|
|
|
+ Where(
|
|
|
+ labellog.LabelIDIn(labelIDs...),
|
|
|
+ labellog.OrganizationID(wxInfo.OrganizationID),
|
|
|
+ labellog.WxID(message.WeChatId),
|
|
|
+ ).
|
|
|
+ Select(labellog.FieldLabelID).
|
|
|
+ All(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("查询 LabelLog 失败: %w", err)
|
|
|
+ }
|
|
|
+ existingLabels, err := svcCtx.DB.Label.Query().
|
|
|
+ Where(
|
|
|
+ label.NameIn(keysFromMap(labelNameSet)...),
|
|
|
+ label.OrganizationID(wxInfo.OrganizationID),
|
|
|
+ ).
|
|
|
+ Select(label.FieldName).
|
|
|
+ All(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return fmt.Errorf("查询 Label 失败: %w", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ existingLabelLogMap := make(map[int]struct{})
|
|
|
+ for _, log := range existingLabelLogs {
|
|
|
+ existingLabelLogMap[log.LabelID] = struct{}{}
|
|
|
+ }
|
|
|
+ existingLabelMap := make(map[string]struct{})
|
|
|
+ for _, lab := range existingLabels {
|
|
|
+ existingLabelMap[lab.Name] = struct{}{}
|
|
|
+ }
|
|
|
+
|
|
|
+ var bulkLabelLogs []*ent.LabelLogCreate
|
|
|
+ var bulkLabels []*ent.LabelCreate
|
|
|
+
|
|
|
+ for _, labelWx := range message.Labels {
|
|
|
+ // 时间戳处理
|
|
|
+ tsInt, err := strconv.ParseInt(labelWx.CreateTime, 10, 64)
|
|
|
if err != nil {
|
|
|
- logx.Errorf("时间戳转换失败: %v", err)
|
|
|
+ logx.Errorf("时间戳转换失败: %v (Label ID: %d)", err, labelWx.LabelId)
|
|
|
continue
|
|
|
}
|
|
|
- bulkCreates = append(bulkCreates,
|
|
|
- svcCtx.DB.LabelLog.Create().
|
|
|
- SetLabelName(label.LabelName).
|
|
|
- SetLabelID(int(label.LabelId)).
|
|
|
- SetOrganizationID(wxInfo.OrganizationID).
|
|
|
- SetWxID(message.WeChatId).
|
|
|
- SetCreatedAt(time.Unix(tsInt/1000, 0)),
|
|
|
- //SetUpdatedAt(time.Now()),
|
|
|
- )
|
|
|
-
|
|
|
- //label 主表
|
|
|
- labelCreates = append(labelCreates,
|
|
|
- svcCtx.DB.Label.Create().
|
|
|
- //SetID(int(label.LabelId)).
|
|
|
- SetName(label.LabelName).
|
|
|
- SetType(1).
|
|
|
- SetStatus(1).
|
|
|
- SetOrganizationID(wxInfo.OrganizationID).
|
|
|
- SetFrom(2).
|
|
|
- SetMode(1).
|
|
|
- SetConditions(string(json.RawMessage(`{}`))).
|
|
|
- SetCreatedAt(time.Now()).
|
|
|
- SetUpdatedAt(time.Now()),
|
|
|
- )
|
|
|
- logx.Info("数据:", label.LabelName+"-----", label.LabelId, wxInfo.OrganizationID, message.WeChatId, time.Unix(tsInt/1000, 0))
|
|
|
+
|
|
|
+ // 插入 LabelLog
|
|
|
+ if _, exists := existingLabelLogMap[int(labelWx.LabelId)]; !exists {
|
|
|
+ bulkLabelLogs = append(bulkLabelLogs,
|
|
|
+ svcCtx.DB.LabelLog.Create().
|
|
|
+ SetLabelName(labelWx.LabelName).
|
|
|
+ SetLabelID(int(labelWx.LabelId)).
|
|
|
+ SetOrganizationID(wxInfo.OrganizationID).
|
|
|
+ SetWxID(message.WeChatId).
|
|
|
+ SetCreatedAt(time.Unix(tsInt/1000, 0)),
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+ // 插入 Label
|
|
|
+ if _, exists := existingLabelMap[labelWx.LabelName]; !exists {
|
|
|
+ bulkLabels = append(bulkLabels,
|
|
|
+ svcCtx.DB.Label.Create().
|
|
|
+ SetName(labelWx.LabelName).
|
|
|
+ SetType(1).
|
|
|
+ SetStatus(1).
|
|
|
+ SetOrganizationID(wxInfo.OrganizationID).
|
|
|
+ SetFrom(2).
|
|
|
+ SetMode(1).
|
|
|
+ SetConditions(`{}`).
|
|
|
+ SetCreatedAt(time.Now()).
|
|
|
+ SetUpdatedAt(time.Now()),
|
|
|
+ )
|
|
|
+ }
|
|
|
}
|
|
|
- //批量插入labelLog
|
|
|
- if len(bulkCreates) > 0 {
|
|
|
- err := svcCtx.DB.LabelLog.CreateBulk(bulkCreates...).
|
|
|
- OnConflict(
|
|
|
- sql.ConflictColumns("label_id", "organization_id"),
|
|
|
- ).
|
|
|
+
|
|
|
+ // 批量插入 LabelLog
|
|
|
+ if len(bulkLabelLogs) > 0 {
|
|
|
+ err := svcCtx.DB.LabelLog.CreateBulk(bulkLabelLogs...).
|
|
|
+ OnConflict(sql.ConflictColumns("label_id", "organization_id")).
|
|
|
DoNothing().
|
|
|
Exec(ctx)
|
|
|
if err != nil {
|
|
|
- logx.Error("labelLog 批量插入失败", bulkCreates)
|
|
|
- //return err
|
|
|
+ logx.Error("labelLog 批量插入失败", bulkLabelLogs)
|
|
|
}
|
|
|
}
|
|
|
- //批量插入label
|
|
|
- if len(labelCreates) > 0 {
|
|
|
- err := svcCtx.DB.Label.CreateBulk(labelCreates...).
|
|
|
- OnConflict(
|
|
|
- sql.ConflictColumns("name", "organization_id"),
|
|
|
- ).
|
|
|
+
|
|
|
+ // 批量插入 Label
|
|
|
+ if len(bulkLabels) > 0 {
|
|
|
+ err := svcCtx.DB.Label.CreateBulk(bulkLabels...).
|
|
|
+ OnConflict(sql.ConflictColumns("name", "organization_id")).
|
|
|
DoNothing().
|
|
|
Exec(ctx)
|
|
|
if err != nil {
|
|
|
- logx.Error("label 批量插入失败", labelCreates)
|
|
|
+ logx.Error("label 批量插入失败", bulkLabels)
|
|
|
return err
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+func keysFromMap(m map[string]struct{}) []string {
|
|
|
+ keys := make([]string, 0, len(m))
|
|
|
+ for k := range m {
|
|
|
+ keys = append(keys, k)
|
|
|
+ }
|
|
|
+ return keys
|
|
|
+}
|