package batch_msg import ( "context" "encoding/json" "errors" "strings" "time" "wechat-api/ent/custom_types" "wechat-api/ent" "wechat-api/ent/contact" "wechat-api/ent/label" "wechat-api/ent/labelrelationship" "wechat-api/internal/svc" "wechat-api/internal/types" "wechat-api/internal/utils/dberrorhandler" "github.com/suyuan32/simple-admin-common/msg/errormsg" "github.com/suyuan32/simple-admin-common/utils/uuidx" "github.com/zeromicro/go-zero/core/logx" ) type CreateBatchMsgLogic struct { ctx context.Context svcCtx *svc.ServiceContext logx.Logger } func NewCreateBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateBatchMsgLogic { return &CreateBatchMsgLogic{ ctx: ctx, svcCtx: svcCtx, Logger: logx.WithContext(ctx), } } func (l *CreateBatchMsgLogic) CreateBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) { all := false for _, label := range req.Labels { if strings.EqualFold(label, "all") || strings.EqualFold(label, "全部") { all = true tagstring := "all" req.Tag = &tagstring } } l.Logger.Infof("CreateBatchMsgLogic: %v", *req) startTime := time.Now() // req.StartTimeStr 不为nil并且不为空 if req.StartTimeStr != nil && *req.StartTimeStr != "" { var err error // 将 req.StartTimeStr 转换为 req.StartTime startTime, err = time.Parse("2006-01-02 15:04:05", *req.StartTimeStr) if err != nil { // 处理错误,例如打印错误并返回 l.Logger.Errorf("时间字符串转换错误: %v", err) return nil, err } } // 定时发送时间 var sendTime time.Time if req.SendTimeStr != nil && *req.SendTimeStr != "" { var err error sendTime, err = time.Parse("2006-01-02 15:04:05", *req.SendTimeStr) if err != nil { // 处理错误,例如打印错误并返回 l.Logger.Errorf("时间字符串转换错误: %v", err) return nil, err } } // 发送内容处理下,json_decode var err error var msgArray []custom_types.Action if req.Msg != nil && *req.Msg != "" { err = json.Unmarshal([]byte(*req.Msg), &msgArray) if err != nil { return nil, errors.New("解析JSON失败") } } var actionMessage []custom_types.Action if len(msgArray) > 0 { actionMessage = make([]custom_types.Action, len(msgArray)) for i, msg := range msgArray { if msg.Type == 1 { actionMessage[i] = custom_types.Action{ Type: msg.Type, Content: msg.Content, } } else { actionMessage[i] = custom_types.Action{ Type: msg.Type, Content: msg.Content, Meta: &custom_types.Meta{ Filename: msg.Meta.Filename, }, } } } } tagstring := strings.Join(req.Labels, ",") req.Tag = &tagstring userlist := make([]*ent.Contact, 0) if all { // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据 userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(*req.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } } else { // 获取 label 表中 name 为 tags的记录 labids, err := l.svcCtx.DB.Label.Query().Where(label.NameIn(req.Labels...)).IDs(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labids...)).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } contact_ids := make([]uint64, 0) for _, labelrelationship := range labelrelationships { contact_ids = append(contact_ids, labelrelationship.ContactID) } if len(contact_ids) > 0 { // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据 userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(*req.Fromwxid), contact.IDIn(contact_ids...), contact.TypeIn(1, 2)).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } } } total := int32(len(userlist)) if total == 0 { return &types.BaseMsgResp{Msg: errormsg.TargetNotFound}, nil } uuid := uuidx.NewUUID() batchNo := uuid.String() // 开始事务 tx, err := l.svcCtx.DB.Tx(context.Background()) _, err = l.svcCtx.DB.BatchMsg.Create(). SetNotNilBatchNo(&batchNo). SetNotNilFromwxid(req.Fromwxid). SetNotNilMsg(req.Msg). SetNotNilTag(req.Tag). SetTotal(total). SetNotNilStartTime(&startTime). SetNotNilSendTime(&sendTime). Save(l.ctx) if err != nil { _ = tx.Rollback() return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } // 批量记录信息到 message_records 表里 for _, user := range userlist { // 每个用户的所有 信息 组合到一个数组里然后批量插入,减少DB的压力 bulkCreate := make([]*ent.MessageRecordsCreate, 0) for _, msg := range actionMessage { bulkCreate = append(bulkCreate, l.svcCtx.DB.MessageRecords.Create(). SetStatus(1). SetNotNilBotWxid(req.Fromwxid). SetNotNilContactID(&user.ID). SetNotNilContactType(&user.Type). SetNotNilContactWxid(&user.Wxid). SetNotNilContentType(&msg.Type). SetNotNilContent(&msg.Content). SetNotNilMeta(msg.Meta). SetSendTime(sendTime), ) } err = l.svcCtx.DB.MessageRecords.CreateBulk(bulkCreate...).Exec(l.ctx) if err != nil { _ = tx.Rollback() return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } } _ = tx.Commit() return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil }