123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- package batch_msg
- import (
- "context"
- "encoding/json"
- "errors"
- "github.com/suyuan32/simple-admin-common/msg/errormsg"
- "github.com/suyuan32/simple-admin-common/utils/uuidx"
- "github.com/zeromicro/go-zero/core/errorx"
- "strings"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/contact"
- "wechat-api/ent/custom_types"
- "wechat-api/ent/label"
- "wechat-api/ent/labelrelationship"
- "wechat-api/hook/aliyun"
- "wechat-api/internal/utils/dberrorhandler"
- "wechat-api/internal/svc"
- "wechat-api/internal/types"
- "github.com/zeromicro/go-zero/core/logx"
- )
- type CreateWhatcappBatchMsgLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- MaxNumber int
- }
- func NewCreateWhatcappBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateWhatcappBatchMsgLogic {
- return &CreateWhatcappBatchMsgLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx}
- }
- func (l *CreateWhatcappBatchMsgLogic) CreateWhatcappBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) {
- organizationId := l.ctx.Value("organizationId").(uint64)
- l.MaxNumber = 1000 //批量处理的最大联系人数
- var err error
- var tagstring, tag string
- allContact := false
- tagIdArray := make([]uint64, 0)
- for _, labelId := range req.Labels {
- tagIdArray = append(tagIdArray, labelId)
- if labelId == uint64(0) { //全部
- allContact = true
- }
- }
- startTime := time.Now()
- sendNow := true
- // 把 req.Msg 字符串的内容 json_decode 到 msgArray
- // 然后再把每一条信息都给所有指定的用户记录到 message_records 表
- var msgArray []custom_types.Action
- if req.Msg != nil && *req.Msg != "" {
- err = json.Unmarshal([]byte(*req.Msg), &msgArray)
- if err != nil {
- return nil, errors.New("解析JSON失败")
- }
- }
- var msgActionList []custom_types.Action
- if len(msgArray) > 0 {
- msgActionList = make([]custom_types.Action, len(msgArray))
- for i, msg := range msgArray {
- if msg.Type == 1 {
- msgActionList[i] = custom_types.Action{
- Type: msg.Type,
- Content: msg.Content,
- }
- } else {
- msgActionList[i] = custom_types.Action{
- Type: msg.Type,
- Content: msg.Content,
- Meta: &custom_types.Meta{
- Filename: msg.Meta.Filename,
- },
- }
- }
- }
- }
- userList := make([]*ent.Contact, 0)
- tagMap := make(map[string][]uint64)
- if allContact {
- // 获取 contact 表中 cc+phone 匹配的联系人
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.OrganizationID(organizationId),
- contact.Ctype(2),
- ).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- tagids := make([]uint64, 0, 1)
- tagids = append(tagids, uint64(0))
- tagMap["contact_tag"] = tagids
- tagByte, err := json.Marshal(tagMap)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- tagstring = string(tagByte)
- tag = "全部"
- } else {
- if allContact { // 所有联系人
- // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.OrganizationID(organizationId),
- contact.Ctype(2),
- ).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- tag = "全部联系人"
- } else {
- userList, err = l.getContactList(req.Labels, *req.Cc, *req.Phone)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- }
- tagMap["contact_tag"] = req.Labels
- tagByte, err := json.Marshal(tagMap)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- tagstring = string(tagByte)
- tagArray := l.getLabelListByIds(req.Labels, req.GroupLabels)
- if tag != "" {
- tagArray = append(tagArray, tag)
- }
- tag = strings.Join(tagArray, ",")
- }
- // 这里是根据userlist 和 批量消息数 获得最终待发送消息总数
- total := int32(len(userList)) * int32(len(msgActionList))
- l.Logger.Infof("userList=%v user_total=%v\n", userList, total)
- if total == 0 {
- return &types.BaseMsgResp{Msg: "未查询到收信人,请重新选择", Code: 3}, nil
- }
- // 开启事务
- tx, err := l.svcCtx.DB.Tx(context.Background())
- batchNo := uuidx.NewUUID().String()
- var sendTime *time.Time
- if !sendNow {
- sendTime = &startTime
- }
- _, err = tx.BatchMsg.Create().
- SetNotNilBatchNo(&batchNo).
- SetStatus(0).
- //SetNotNilFromwxid(req.Fromwxid).
- SetNotNilMsg(req.Msg).
- SetNotNilTag(&tag).
- SetNotNilPhone(req.Phone).
- SetNotNilCc(req.Cc).
- SetNotNilTagids(&tagstring).
- SetTotal(total).
- SetNotNilTaskName(req.TaskName).
- SetNotNilStartTime(&startTime).
- SetNillableSendTime(sendTime).
- SetType(3).
- SetNotNilOrganizationID(&organizationId).
- SetCtype(2).
- Save(l.ctx)
- if err != nil {
- _ = tx.Rollback()
- l.Logger.Errorf("insert whatsapp batch_msg err=%v\n", err)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- }
- userSegment := make([]*ent.Contact, 0, l.MaxNumber)
- for _, user := range userList {
- userSegment = append(userSegment, user)
- if len(userSegment) == l.MaxNumber {
- err = l.addBatchMsg(userSegment, tx, req, batchNo)
- if err != nil {
- _ = tx.Rollback()
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- }
- userSegment = make([]*ent.Contact, 0, l.MaxNumber)
- }
- if len(userSegment) > 0 {
- err = l.addBatchMsg(userSegment, tx, req, batchNo)
- if err != nil {
- _ = tx.Rollback()
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
- }
- }
- _ = tx.Commit()
- return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
- }
- func (l *CreateWhatcappBatchMsgLogic) getContactList(labels []uint64, cc, phone string) ([]*ent.Contact, error) {
- // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
- labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- contact_ids := make([]uint64, 0, len(labelrelationships))
- for _, labelrelationship := range labelrelationships {
- contact_ids = append(contact_ids, labelrelationship.ContactID)
- }
- userList := make([]*ent.Contact, 0)
- if len(contact_ids) > 0 {
- // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
- userList, err = l.svcCtx.DB.Contact.Query().Where(
- contact.Cc(cc),
- contact.Phone(phone),
- contact.IDIn(contact_ids...),
- contact.Ctype(2),
- ).All(l.ctx)
- if err != nil {
- return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
- }
- }
- return userList, nil
- }
- func (l *CreateWhatcappBatchMsgLogic) getLabelListByIds(labels, groupLabels []uint64) []string {
- result := make([]string, 0)
- labels = append(labels, groupLabels...)
- if len(labels) > 0 {
- contacts, err := l.svcCtx.DB.Label.Query().Where(
- label.IDIn(labels...),
- ).Select("name").All(l.ctx)
- l.Logger.Infof("contacts=%v", contacts)
- if err != nil {
- return result
- }
- for _, val := range contacts {
- result = append(result, val.Name)
- }
- }
- return result
- }
- func (l *CreateWhatcappBatchMsgLogic) addBatchMsg(userList []*ent.Contact, tx *ent.Tx, req *types.BatchMsgInfo, batchNo string) error {
- // 发送批量消息
- sto := make([]string, 0, 100)
- for _, v := range userList {
- sto = append(sto, v.Cc+v.Phone)
- }
- fromPhone := *req.Cc + *req.Phone
- result, err := aliyun.SendBatchChatappMessage(*req.TemplateCode, *req.Lang, fromPhone, sto)
- if err != nil {
- return errorx.NewInvalidArgumentError(err.Error())
- }
- l.Logger.Infof("send_batch_chatapp_message result=%v\n", result)
- msgs := make([]*ent.MsgCreate, 0, l.MaxNumber)
- for _, user := range userList {
- phone := user.Cc + user.Phone
- msgRow := tx.Msg.Create().
- SetNotNilCc(req.Cc).
- SetNotNilPhone(req.Phone).
- SetNotNilToid(&phone).
- SetMsgtype(int32(1)).
- SetNotNilMsg(req.Msg).
- SetStatus(1).
- SetNotNilBatchNo(&batchNo)
- msgs = append(msgs, msgRow)
- }
- _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
- if err != nil {
- l.Logger.Errorf("msg CreateBulk err: %v", err)
- return err
- }
- return nil
- }
|