create_batch_msg_logic.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. package batch_msg
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "strings"
  7. "time"
  8. "wechat-api/ent/custom_types"
  9. "wechat-api/ent"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/label"
  12. "wechat-api/ent/labelrelationship"
  13. "wechat-api/internal/svc"
  14. "wechat-api/internal/types"
  15. "wechat-api/internal/utils/dberrorhandler"
  16. "github.com/suyuan32/simple-admin-common/msg/errormsg"
  17. "github.com/suyuan32/simple-admin-common/utils/uuidx"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. )
  20. type CreateBatchMsgLogic struct {
  21. ctx context.Context
  22. svcCtx *svc.ServiceContext
  23. logx.Logger
  24. }
  25. func NewCreateBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateBatchMsgLogic {
  26. return &CreateBatchMsgLogic{
  27. ctx: ctx,
  28. svcCtx: svcCtx,
  29. Logger: logx.WithContext(ctx),
  30. }
  31. }
  32. func (l *CreateBatchMsgLogic) CreateBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) {
  33. var err error
  34. organizationId := l.ctx.Value("organizationId").(uint64)
  35. all := false
  36. for _, labelName := range req.Labels {
  37. if strings.EqualFold(labelName, "all") || strings.EqualFold(labelName, "全部") {
  38. all = true
  39. tagstring := "all"
  40. req.Tag = &tagstring
  41. }
  42. }
  43. l.Logger.Infof("CreateBatchMsgLogic: %v", *req)
  44. startTime := time.Now()
  45. // req.StartTimeStr 不为nil并且不为空
  46. if req.StartTimeStr != nil && *req.StartTimeStr != "" {
  47. // 将 req.StartTimeStr 转换为 req.StartTime
  48. startTime, err = time.Parse("2006-01-02 15:04:05", *req.StartTimeStr)
  49. if err != nil {
  50. // 处理错误,例如打印错误并返回
  51. l.Logger.Errorf("时间字符串转换错误: %v", err)
  52. return nil, err
  53. }
  54. }
  55. // 定时发送时间
  56. var sendTime *time.Time
  57. if req.SendTimeStr != nil && *req.SendTimeStr != "" {
  58. sendTimeParse, err := time.Parse("2006-01-02 15:04:05", *req.SendTimeStr)
  59. if err != nil {
  60. // 处理错误,例如打印错误并返回
  61. l.Logger.Errorf("时间字符串转换错误: %v", err)
  62. return nil, err
  63. }
  64. sendTime = &sendTimeParse
  65. }
  66. // 把 req.Msg 字符串的内容 json_decode 到 msgArray
  67. // 然后再把每一条信息都给所有指定的用户记录到 message_records 表
  68. var msgArray []custom_types.Action
  69. if req.Msg != nil && *req.Msg != "" {
  70. err = json.Unmarshal([]byte(*req.Msg), &msgArray)
  71. if err != nil {
  72. return nil, errors.New("解析JSON失败")
  73. }
  74. }
  75. var msgActionList []custom_types.Action
  76. if len(msgArray) > 0 {
  77. msgActionList = make([]custom_types.Action, len(msgArray))
  78. for i, msg := range msgArray {
  79. if msg.Type == 1 {
  80. msgActionList[i] = custom_types.Action{
  81. Type: msg.Type,
  82. Content: msg.Content,
  83. }
  84. } else {
  85. msgActionList[i] = custom_types.Action{
  86. Type: msg.Type,
  87. Content: msg.Content,
  88. Meta: &custom_types.Meta{
  89. Filename: msg.Meta.Filename,
  90. },
  91. }
  92. }
  93. }
  94. }
  95. tagstring := strings.Join(req.Labels, ",")
  96. req.Tag = &tagstring
  97. userlist := make([]*ent.Contact, 0)
  98. if all {
  99. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
  100. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(*req.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
  101. if err != nil {
  102. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  103. }
  104. } else {
  105. // 获取 label 表中 name 为 tags的记录
  106. labids, err := l.svcCtx.DB.Label.Query().Where(label.NameIn(req.Labels...)).IDs(l.ctx)
  107. if err != nil {
  108. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  109. }
  110. //l.Logger.Infof("CreateBatchMsgLogic labids= %v", labids)
  111. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  112. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labids...)).All(l.ctx)
  113. if err != nil {
  114. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  115. }
  116. contact_ids := make([]uint64, 0)
  117. for _, labelrelationship := range labelrelationships {
  118. contact_ids = append(contact_ids, labelrelationship.ContactID)
  119. }
  120. //l.Logger.Infof("CreateBatchMsgLogic contact_ids= %v", contact_ids)
  121. if len(contact_ids) > 0 {
  122. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  123. userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(*req.Fromwxid), contact.IDIn(contact_ids...), contact.TypeIn(1, 2)).All(l.ctx)
  124. if err != nil {
  125. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  126. }
  127. //l.Logger.Infof("CreateBatchMsgLogic userlist= %v", userlist)
  128. }
  129. }
  130. total := int32(len(userlist))
  131. if total == 0 {
  132. return &types.BaseMsgResp{Msg: errormsg.TargetNotFound}, nil
  133. }
  134. uuid := uuidx.NewUUID()
  135. batchNo := uuid.String()
  136. // 开始事务
  137. tx, err := l.svcCtx.DB.Tx(context.Background())
  138. if err != nil {
  139. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  140. }
  141. batchMsg, err := l.svcCtx.DB.BatchMsg.Create().
  142. SetNotNilBatchNo(&batchNo).
  143. SetNotNilFromwxid(req.Fromwxid).
  144. SetNotNilMsg(req.Msg).
  145. SetNotNilTag(req.Tag).
  146. SetTotal(total).
  147. SetNotNilTaskName(req.TaskName).
  148. SetNotNilStartTime(&startTime).
  149. SetNotNilSendTime(sendTime).
  150. SetNotNilType(req.Type).
  151. Save(l.ctx)
  152. if err != nil {
  153. _ = tx.Rollback()
  154. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  155. }
  156. // 批量记录信息到 message_records 表里
  157. for _, user := range userlist {
  158. // 每个用户的所有 信息 组合到一个数组里然后批量插入,减少DB的压力
  159. bulkCreate := make([]*ent.MessageRecordsCreate, 0)
  160. for idx, msg := range msgActionList {
  161. bulkCreate = append(bulkCreate, l.svcCtx.DB.MessageRecords.Create().
  162. SetStatus(1).
  163. SetNotNilBotWxid(req.Fromwxid).
  164. SetNotNilContactID(&user.ID).
  165. SetNotNilContactType(&user.Type).
  166. SetNotNilContentType(&msg.Type).
  167. SetNotNilContent(&msg.Content).
  168. SetNotNilMeta(msg.Meta).
  169. SetNotNilSendTime(sendTime).
  170. SetNotNilContactWxid(&user.Wxid). //接收方wxid
  171. SetSourceType(2). // 2:批量消息
  172. SetSourceID(batchMsg.ID). // 批量消息ID,batch_msg 表主键
  173. SetSubSourceID(uint64(idx)). // 用索引作为 sub_source_id
  174. SetOrganizationID(organizationId),
  175. )
  176. }
  177. err = tx.MessageRecords.CreateBulk(bulkCreate...).Exec(l.ctx)
  178. if err != nil {
  179. _ = tx.Rollback()
  180. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  181. }
  182. }
  183. _ = tx.Commit()
  184. return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
  185. }