package batch_msg import ( "context" "encoding/json" "github.com/suyuan32/simple-admin-common/msg/errormsg" "github.com/suyuan32/simple-admin-common/utils/uuidx" "strings" "time" "wechat-api/ent" "wechat-api/ent/contact" "wechat-api/ent/label" "wechat-api/ent/labelrelationship" "wechat-api/hook/aliyun" "wechat-api/internal/utils/dberrorhandler" "wechat-api/internal/svc" "wechat-api/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CreateWhatcappBatchMsgLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext MaxNumber int } func NewCreateWhatcappBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateWhatcappBatchMsgLogic { return &CreateWhatcappBatchMsgLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx} } func (l *CreateWhatcappBatchMsgLogic) CreateWhatcappBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) { organizationId := l.ctx.Value("organizationId").(uint64) l.MaxNumber = 1000 //批量处理的最大联系人数 var err error var tagstring, tag string allContact := false tagIdArray := make([]uint64, 0) for _, labelId := range req.Labels { tagIdArray = append(tagIdArray, labelId) if labelId == uint64(0) { //全部 allContact = true } } startTime := time.Now() sendNow := true userList := make([]*ent.Contact, 0) tagMap := make(map[string][]uint64) if allContact { // 获取 contact 表中 cc+phone 匹配的联系人 userList, err = l.svcCtx.DB.Contact.Query().Where( contact.OrganizationID(organizationId), contact.Ctype(2), ).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } tagids := make([]uint64, 0, 1) tagids = append(tagids, uint64(0)) tagMap["contact_tag"] = tagids tagByte, err := json.Marshal(tagMap) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } tagstring = string(tagByte) tag = "全部" } else { userList, err = l.getContactList(req.Labels, *req.Cc, *req.Phone) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } tagMap["contact_tag"] = req.Labels tagByte, err := json.Marshal(tagMap) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } tagstring = string(tagByte) tagArray := l.getLabelListByIds(req.Labels) if tag != "" { tagArray = append(tagArray, tag) } tag = strings.Join(tagArray, ",") } // 这里是根据userlist 和 批量消息数 获得最终待发送消息总数 totalrows := int32(len(userList)) l.Logger.Infof("userList=%v user_total=%v\n", userList, totalrows) if totalrows == 0 { return &types.BaseMsgResp{Msg: "未查询到收信人,请重新选择", Code: 3}, nil } // 开启事务 tx, err := l.svcCtx.DB.Tx(context.Background()) batchNo := uuidx.NewUUID().String() var sendTime *time.Time if !sendNow { sendTime = &startTime } var total, success, fail int32 userSegment := make([]*ent.Contact, 0, l.MaxNumber) total = int32(len(userList)) for _, user := range userList { userSegment = append(userSegment, user) if len(userSegment) == l.MaxNumber { err = l.addBatchMsg(userSegment, tx, req, batchNo) if err != nil { fail += int32(len(userSegment)) _ = tx.Rollback() return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } else { success += int32(len(userSegment)) userSegment = make([]*ent.Contact, 0, l.MaxNumber) } } } if len(userSegment) > 0 { err = l.addBatchMsg(userSegment, tx, req, batchNo) if err != nil { fail += int32(len(userSegment)) _ = tx.Rollback() return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } else { success += int32(len(userSegment)) } } _, err = tx.BatchMsg.Create(). SetNotNilBatchNo(&batchNo). SetStatus(2). //SetNotNilFromwxid(req.Fromwxid). SetNotNilMsg(req.Msg). SetNotNilTag(&tag). SetNotNilPhone(req.Phone). SetNotNilCc(req.Cc). SetNotNilTagids(&tagstring). SetTotal(total). SetNotNilTaskName(req.TaskName). SetNotNilStartTime(&startTime). SetNillableSendTime(sendTime). SetType(3). SetNotNilTemplateCode(req.TemplateCode). SetNotNilTemplateName(req.TemplateName). SetNotNilLang(req.Lang). SetNotNilOrganizationID(&organizationId). SetCtype(2). SetSuccess(success). SetFail(fail). Save(l.ctx) if err != nil { _ = tx.Rollback() l.Logger.Errorf("insert whatsapp batch_msg err=%v\n", err) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, req) } } _ = tx.Commit() return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil } func (l *CreateWhatcappBatchMsgLogic) getContactList(labels []uint64, cc, phone string) ([]*ent.Contact, error) { // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil) } contact_ids := make([]uint64, 0, len(labelrelationships)) for _, labelrelationship := range labelrelationships { contact_ids = append(contact_ids, labelrelationship.ContactID) } userList := make([]*ent.Contact, 0) if len(contact_ids) > 0 { // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据 userList, err = l.svcCtx.DB.Contact.Query().Where( contact.IDIn(contact_ids...), contact.Ctype(2), ).All(l.ctx) if err != nil { return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil) } } return userList, nil } func (l *CreateWhatcappBatchMsgLogic) getLabelListByIds(labels []uint64) []string { result := make([]string, 0) if len(labels) > 0 { contacts, err := l.svcCtx.DB.Label.Query().Where( label.IDIn(labels...), ).Select("name").All(l.ctx) l.Logger.Infof("contacts=%v", contacts) if err != nil { return result } for _, val := range contacts { result = append(result, val.Name) } } return result } func (l *CreateWhatcappBatchMsgLogic) addBatchMsg(userList []*ent.Contact, tx *ent.Tx, req *types.BatchMsgInfo, batchNo string) error { // 发送批量消息 sto := make([]string, 0, 100) for _, v := range userList { sto = append(sto, v.Cc+v.Phone) } fromPhone := *req.Cc + *req.Phone result, err := aliyun.SendBatchChatappMessage(*req.TemplateCode, *req.Lang, fromPhone, sto) l.Logger.Infof("send_batch_chatapp_message result=%v\n", result) var status uint8 = 1 if err != nil { status = 2 } msgs := make([]*ent.MsgCreate, 0, l.MaxNumber) for _, user := range userList { phone := user.Cc + user.Phone msgRow := tx.Msg.Create(). SetNotNilCc(req.Cc). SetNotNilPhone(req.Phone). SetNotNilToid(&phone). SetMsgtype(int32(1)). //SetNotNilMsg(req.Msg). SetStatus(status). SetNotNilBatchNo(&batchNo) msgs = append(msgs, msgRow) } _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx) if err != nil { l.Logger.Errorf("msg CreateBulk err: %v", err) return err } return nil }