create_whatcapp_batch_msg_logic.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package batch_msg
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "github.com/suyuan32/simple-admin-common/msg/errormsg"
  7. "github.com/suyuan32/simple-admin-common/utils/uuidx"
  8. "github.com/zeromicro/go-zero/core/errorx"
  9. "strings"
  10. "time"
  11. "wechat-api/ent"
  12. "wechat-api/ent/contact"
  13. "wechat-api/ent/custom_types"
  14. "wechat-api/ent/label"
  15. "wechat-api/ent/labelrelationship"
  16. "wechat-api/hook/aliyun"
  17. "wechat-api/internal/utils/dberrorhandler"
  18. "wechat-api/internal/svc"
  19. "wechat-api/internal/types"
  20. "github.com/zeromicro/go-zero/core/logx"
  21. )
  22. type CreateWhatcappBatchMsgLogic struct {
  23. logx.Logger
  24. ctx context.Context
  25. svcCtx *svc.ServiceContext
  26. MaxNumber int
  27. }
  28. func NewCreateWhatcappBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateWhatcappBatchMsgLogic {
  29. return &CreateWhatcappBatchMsgLogic{
  30. Logger: logx.WithContext(ctx),
  31. ctx: ctx,
  32. svcCtx: svcCtx}
  33. }
  34. func (l *CreateWhatcappBatchMsgLogic) CreateWhatcappBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) {
  35. organizationId := l.ctx.Value("organizationId").(uint64)
  36. l.MaxNumber = 1000 //批量处理的最大联系人数
  37. var err error
  38. var tagstring, tag string
  39. allContact := false
  40. tagIdArray := make([]uint64, 0)
  41. for _, labelId := range req.Labels {
  42. tagIdArray = append(tagIdArray, labelId)
  43. if labelId == uint64(0) { //全部
  44. allContact = true
  45. }
  46. }
  47. startTime := time.Now()
  48. sendNow := true
  49. // 把 req.Msg 字符串的内容 json_decode 到 msgArray
  50. // 然后再把每一条信息都给所有指定的用户记录到 message_records 表
  51. var msgArray []custom_types.Action
  52. if req.Msg != nil && *req.Msg != "" {
  53. err = json.Unmarshal([]byte(*req.Msg), &msgArray)
  54. if err != nil {
  55. return nil, errors.New("解析JSON失败")
  56. }
  57. }
  58. var msgActionList []custom_types.Action
  59. if len(msgArray) > 0 {
  60. msgActionList = make([]custom_types.Action, len(msgArray))
  61. for i, msg := range msgArray {
  62. if msg.Type == 1 {
  63. msgActionList[i] = custom_types.Action{
  64. Type: msg.Type,
  65. Content: msg.Content,
  66. }
  67. } else {
  68. msgActionList[i] = custom_types.Action{
  69. Type: msg.Type,
  70. Content: msg.Content,
  71. Meta: &custom_types.Meta{
  72. Filename: msg.Meta.Filename,
  73. },
  74. }
  75. }
  76. }
  77. }
  78. userList := make([]*ent.Contact, 0)
  79. tagMap := make(map[string][]uint64)
  80. if allContact {
  81. // 获取 contact 表中 cc+phone 匹配的联系人
  82. userList, err = l.svcCtx.DB.Contact.Query().Where(
  83. contact.OrganizationID(organizationId),
  84. contact.Ctype(2),
  85. ).All(l.ctx)
  86. if err != nil {
  87. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  88. }
  89. tagids := make([]uint64, 0, 1)
  90. tagids = append(tagids, uint64(0))
  91. tagMap["contact_tag"] = tagids
  92. tagByte, err := json.Marshal(tagMap)
  93. if err != nil {
  94. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  95. }
  96. tagstring = string(tagByte)
  97. tag = "全部"
  98. } else {
  99. if allContact { // 所有联系人
  100. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
  101. userList, err = l.svcCtx.DB.Contact.Query().Where(
  102. contact.OrganizationID(organizationId),
  103. contact.Ctype(2),
  104. ).All(l.ctx)
  105. if err != nil {
  106. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  107. }
  108. tag = "全部联系人"
  109. } else {
  110. userList, err = l.getContactList(req.Labels, *req.Cc, *req.Phone)
  111. if err != nil {
  112. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  113. }
  114. }
  115. tagMap["contact_tag"] = req.Labels
  116. tagByte, err := json.Marshal(tagMap)
  117. if err != nil {
  118. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  119. }
  120. tagstring = string(tagByte)
  121. tagArray := l.getLabelListByIds(req.Labels, req.GroupLabels)
  122. if tag != "" {
  123. tagArray = append(tagArray, tag)
  124. }
  125. tag = strings.Join(tagArray, ",")
  126. }
  127. // 这里是根据userlist 和 批量消息数 获得最终待发送消息总数
  128. total := int32(len(userList)) * int32(len(msgActionList))
  129. l.Logger.Infof("userList=%v user_total=%v\n", userList, total)
  130. if total == 0 {
  131. return &types.BaseMsgResp{Msg: "未查询到收信人,请重新选择", Code: 3}, nil
  132. }
  133. // 开启事务
  134. tx, err := l.svcCtx.DB.Tx(context.Background())
  135. batchNo := uuidx.NewUUID().String()
  136. var sendTime *time.Time
  137. if !sendNow {
  138. sendTime = &startTime
  139. }
  140. _, err = tx.BatchMsg.Create().
  141. SetNotNilBatchNo(&batchNo).
  142. SetStatus(0).
  143. //SetNotNilFromwxid(req.Fromwxid).
  144. SetNotNilMsg(req.Msg).
  145. SetNotNilTag(&tag).
  146. SetNotNilPhone(req.Phone).
  147. SetNotNilCc(req.Cc).
  148. SetNotNilTagids(&tagstring).
  149. SetTotal(total).
  150. SetNotNilTaskName(req.TaskName).
  151. SetNotNilStartTime(&startTime).
  152. SetNillableSendTime(sendTime).
  153. SetType(3).
  154. SetNotNilOrganizationID(&organizationId).
  155. SetCtype(2).
  156. Save(l.ctx)
  157. if err != nil {
  158. _ = tx.Rollback()
  159. l.Logger.Errorf("insert whatsapp batch_msg err=%v\n", err)
  160. if err != nil {
  161. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  162. }
  163. }
  164. userSegment := make([]*ent.Contact, 0, l.MaxNumber)
  165. for _, user := range userList {
  166. userSegment = append(userSegment, user)
  167. if len(userSegment) == l.MaxNumber {
  168. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  169. if err != nil {
  170. _ = tx.Rollback()
  171. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  172. }
  173. }
  174. userSegment = make([]*ent.Contact, 0, l.MaxNumber)
  175. }
  176. if len(userSegment) > 0 {
  177. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  178. if err != nil {
  179. _ = tx.Rollback()
  180. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  181. }
  182. }
  183. _ = tx.Commit()
  184. return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
  185. }
  186. func (l *CreateWhatcappBatchMsgLogic) getContactList(labels []uint64, cc, phone string) ([]*ent.Contact, error) {
  187. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  188. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
  189. if err != nil {
  190. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  191. }
  192. contact_ids := make([]uint64, 0, len(labelrelationships))
  193. for _, labelrelationship := range labelrelationships {
  194. contact_ids = append(contact_ids, labelrelationship.ContactID)
  195. }
  196. userList := make([]*ent.Contact, 0)
  197. if len(contact_ids) > 0 {
  198. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  199. userList, err = l.svcCtx.DB.Contact.Query().Where(
  200. contact.Cc(cc),
  201. contact.Phone(phone),
  202. contact.IDIn(contact_ids...),
  203. contact.Ctype(2),
  204. ).All(l.ctx)
  205. if err != nil {
  206. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  207. }
  208. }
  209. return userList, nil
  210. }
  211. func (l *CreateWhatcappBatchMsgLogic) getLabelListByIds(labels, groupLabels []uint64) []string {
  212. result := make([]string, 0)
  213. labels = append(labels, groupLabels...)
  214. if len(labels) > 0 {
  215. contacts, err := l.svcCtx.DB.Label.Query().Where(
  216. label.IDIn(labels...),
  217. ).Select("name").All(l.ctx)
  218. l.Logger.Infof("contacts=%v", contacts)
  219. if err != nil {
  220. return result
  221. }
  222. for _, val := range contacts {
  223. result = append(result, val.Name)
  224. }
  225. }
  226. return result
  227. }
  228. func (l *CreateWhatcappBatchMsgLogic) addBatchMsg(userList []*ent.Contact, tx *ent.Tx, req *types.BatchMsgInfo, batchNo string) error {
  229. // 发送批量消息
  230. sto := make([]string, 0, 100)
  231. for _, v := range userList {
  232. sto = append(sto, v.Cc+v.Phone)
  233. }
  234. fromPhone := *req.Cc + *req.Phone
  235. result, err := aliyun.SendBatchChatappMessage(*req.TemplateCode, *req.Lang, fromPhone, sto)
  236. if err != nil {
  237. return errorx.NewInvalidArgumentError(err.Error())
  238. }
  239. l.Logger.Infof("send_batch_chatapp_message result=%v\n", result)
  240. msgs := make([]*ent.MsgCreate, 0, l.MaxNumber)
  241. for _, user := range userList {
  242. phone := user.Cc + user.Phone
  243. msgRow := tx.Msg.Create().
  244. SetNotNilCc(req.Cc).
  245. SetNotNilPhone(req.Phone).
  246. SetNotNilToid(&phone).
  247. SetMsgtype(int32(1)).
  248. SetNotNilMsg(req.Msg).
  249. SetStatus(1).
  250. SetNotNilBatchNo(&batchNo)
  251. msgs = append(msgs, msgRow)
  252. }
  253. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  254. if err != nil {
  255. l.Logger.Errorf("msg CreateBulk err: %v", err)
  256. return err
  257. }
  258. return nil
  259. }