create_whatcapp_batch_msg_logic.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. package batch_msg
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/suyuan32/simple-admin-common/msg/errormsg"
  6. "github.com/suyuan32/simple-admin-common/utils/uuidx"
  7. "strings"
  8. "time"
  9. "wechat-api/ent"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/label"
  12. "wechat-api/ent/labelrelationship"
  13. "wechat-api/hook/aliyun"
  14. "wechat-api/internal/utils/dberrorhandler"
  15. "wechat-api/internal/svc"
  16. "wechat-api/internal/types"
  17. "github.com/zeromicro/go-zero/core/logx"
  18. )
  19. type CreateWhatcappBatchMsgLogic struct {
  20. logx.Logger
  21. ctx context.Context
  22. svcCtx *svc.ServiceContext
  23. MaxNumber int
  24. }
  25. func NewCreateWhatcappBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateWhatcappBatchMsgLogic {
  26. return &CreateWhatcappBatchMsgLogic{
  27. Logger: logx.WithContext(ctx),
  28. ctx: ctx,
  29. svcCtx: svcCtx}
  30. }
  31. func (l *CreateWhatcappBatchMsgLogic) CreateWhatcappBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) {
  32. organizationId := l.ctx.Value("organizationId").(uint64)
  33. l.MaxNumber = 1000 //批量处理的最大联系人数
  34. var err error
  35. var tagstring, tag string
  36. allContact := false
  37. tagIdArray := make([]uint64, 0)
  38. for _, labelId := range req.Labels {
  39. tagIdArray = append(tagIdArray, labelId)
  40. if labelId == uint64(0) { //全部
  41. allContact = true
  42. }
  43. }
  44. startTime := time.Now()
  45. sendNow := true
  46. userList := make([]*ent.Contact, 0)
  47. tagMap := make(map[string][]uint64)
  48. if allContact {
  49. // 获取 contact 表中 cc+phone 匹配的联系人
  50. userList, err = l.svcCtx.DB.Contact.Query().Where(
  51. contact.OrganizationID(organizationId),
  52. contact.Ctype(2),
  53. ).All(l.ctx)
  54. if err != nil {
  55. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  56. }
  57. tagids := make([]uint64, 0, 1)
  58. tagids = append(tagids, uint64(0))
  59. tagMap["contact_tag"] = tagids
  60. tagByte, err := json.Marshal(tagMap)
  61. if err != nil {
  62. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  63. }
  64. tagstring = string(tagByte)
  65. tag = "全部"
  66. } else {
  67. userList, err = l.getContactList(req.Labels, *req.Cc, *req.Phone)
  68. if err != nil {
  69. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  70. }
  71. tagMap["contact_tag"] = req.Labels
  72. tagByte, err := json.Marshal(tagMap)
  73. if err != nil {
  74. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  75. }
  76. tagstring = string(tagByte)
  77. tagArray := l.getLabelListByIds(req.Labels)
  78. if tag != "" {
  79. tagArray = append(tagArray, tag)
  80. }
  81. tag = strings.Join(tagArray, ",")
  82. }
  83. // 这里是根据userlist 和 批量消息数 获得最终待发送消息总数
  84. totalrows := int32(len(userList))
  85. l.Logger.Infof("userList=%v user_total=%v\n", userList, totalrows)
  86. if totalrows == 0 {
  87. return &types.BaseMsgResp{Msg: "未查询到收信人,请重新选择", Code: 3}, nil
  88. }
  89. // 开启事务
  90. tx, err := l.svcCtx.DB.Tx(context.Background())
  91. batchNo := uuidx.NewUUID().String()
  92. var sendTime *time.Time
  93. if !sendNow {
  94. sendTime = &startTime
  95. }
  96. var total, success, fail int32
  97. userSegment := make([]*ent.Contact, 0, l.MaxNumber)
  98. total = int32(len(userList))
  99. for _, user := range userList {
  100. userSegment = append(userSegment, user)
  101. if len(userSegment) == l.MaxNumber {
  102. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  103. if err != nil {
  104. fail += int32(len(userSegment))
  105. _ = tx.Rollback()
  106. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  107. } else {
  108. success += int32(len(userSegment))
  109. userSegment = make([]*ent.Contact, 0, l.MaxNumber)
  110. }
  111. }
  112. }
  113. if len(userSegment) > 0 {
  114. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  115. if err != nil {
  116. fail += int32(len(userSegment))
  117. _ = tx.Rollback()
  118. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  119. } else {
  120. success += int32(len(userSegment))
  121. }
  122. }
  123. _, err = tx.BatchMsg.Create().
  124. SetNotNilBatchNo(&batchNo).
  125. SetStatus(2).
  126. //SetNotNilFromwxid(req.Fromwxid).
  127. SetNotNilMsg(req.Msg).
  128. SetNotNilTag(&tag).
  129. SetNotNilPhone(req.Phone).
  130. SetNotNilCc(req.Cc).
  131. SetNotNilTagids(&tagstring).
  132. SetTotal(total).
  133. SetNotNilTaskName(req.TaskName).
  134. SetNotNilStartTime(&startTime).
  135. SetNillableSendTime(sendTime).
  136. SetType(3).
  137. SetNotNilTemplateCode(req.TemplateCode).
  138. SetNotNilTemplateName(req.TemplateName).
  139. SetNotNilLang(req.Lang).
  140. SetNotNilOrganizationID(&organizationId).
  141. SetCtype(2).
  142. SetSuccess(success).
  143. SetFail(fail).
  144. Save(l.ctx)
  145. if err != nil {
  146. _ = tx.Rollback()
  147. l.Logger.Errorf("insert whatsapp batch_msg err=%v\n", err)
  148. if err != nil {
  149. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  150. }
  151. }
  152. _ = tx.Commit()
  153. return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
  154. }
  155. func (l *CreateWhatcappBatchMsgLogic) getContactList(labels []uint64, cc, phone string) ([]*ent.Contact, error) {
  156. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  157. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
  158. if err != nil {
  159. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  160. }
  161. contact_ids := make([]uint64, 0, len(labelrelationships))
  162. for _, labelrelationship := range labelrelationships {
  163. contact_ids = append(contact_ids, labelrelationship.ContactID)
  164. }
  165. userList := make([]*ent.Contact, 0)
  166. if len(contact_ids) > 0 {
  167. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  168. userList, err = l.svcCtx.DB.Contact.Query().Where(
  169. contact.IDIn(contact_ids...),
  170. contact.Ctype(2),
  171. ).All(l.ctx)
  172. if err != nil {
  173. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  174. }
  175. }
  176. return userList, nil
  177. }
  178. func (l *CreateWhatcappBatchMsgLogic) getLabelListByIds(labels []uint64) []string {
  179. result := make([]string, 0)
  180. if len(labels) > 0 {
  181. contacts, err := l.svcCtx.DB.Label.Query().Where(
  182. label.IDIn(labels...),
  183. ).Select("name").All(l.ctx)
  184. l.Logger.Infof("contacts=%v", contacts)
  185. if err != nil {
  186. return result
  187. }
  188. for _, val := range contacts {
  189. result = append(result, val.Name)
  190. }
  191. }
  192. return result
  193. }
  194. func (l *CreateWhatcappBatchMsgLogic) addBatchMsg(userList []*ent.Contact, tx *ent.Tx, req *types.BatchMsgInfo, batchNo string) error {
  195. // 发送批量消息
  196. sto := make([]string, 0, 100)
  197. for _, v := range userList {
  198. sto = append(sto, v.Cc+v.Phone)
  199. }
  200. fromPhone := *req.Cc + *req.Phone
  201. result, err := aliyun.SendBatchChatappMessage(*req.TemplateCode, *req.Lang, fromPhone, sto)
  202. l.Logger.Infof("send_batch_chatapp_message result=%v\n", result)
  203. var status uint8 = 1
  204. if err != nil {
  205. status = 2
  206. }
  207. msgs := make([]*ent.MsgCreate, 0, l.MaxNumber)
  208. for _, user := range userList {
  209. phone := user.Cc + user.Phone
  210. msgRow := tx.Msg.Create().
  211. SetNotNilCc(req.Cc).
  212. SetNotNilPhone(req.Phone).
  213. SetNotNilToid(&phone).
  214. SetMsgtype(int32(1)).
  215. //SetNotNilMsg(req.Msg).
  216. SetStatus(status).
  217. SetNotNilBatchNo(&batchNo)
  218. msgs = append(msgs, msgRow)
  219. }
  220. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  221. if err != nil {
  222. l.Logger.Errorf("msg CreateBulk err: %v", err)
  223. return err
  224. }
  225. return nil
  226. }