create_whatcapp_batch_msg_logic.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package batch_msg
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "github.com/suyuan32/simple-admin-common/msg/errormsg"
  7. "github.com/suyuan32/simple-admin-common/utils/uuidx"
  8. "strings"
  9. "time"
  10. "wechat-api/ent"
  11. "wechat-api/ent/contact"
  12. "wechat-api/ent/custom_types"
  13. "wechat-api/ent/label"
  14. "wechat-api/ent/labelrelationship"
  15. "wechat-api/hook/aliyun"
  16. "wechat-api/internal/utils/dberrorhandler"
  17. "wechat-api/internal/svc"
  18. "wechat-api/internal/types"
  19. "github.com/zeromicro/go-zero/core/logx"
  20. )
  21. type CreateWhatcappBatchMsgLogic struct {
  22. logx.Logger
  23. ctx context.Context
  24. svcCtx *svc.ServiceContext
  25. MaxNumber int
  26. }
  27. func NewCreateWhatcappBatchMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateWhatcappBatchMsgLogic {
  28. return &CreateWhatcappBatchMsgLogic{
  29. Logger: logx.WithContext(ctx),
  30. ctx: ctx,
  31. svcCtx: svcCtx}
  32. }
  33. func (l *CreateWhatcappBatchMsgLogic) CreateWhatcappBatchMsg(req *types.BatchMsgInfo) (*types.BaseMsgResp, error) {
  34. organizationId := l.ctx.Value("organizationId").(uint64)
  35. l.MaxNumber = 1000 //批量处理的最大联系人数
  36. var err error
  37. var tagstring, tag string
  38. allContact := false
  39. tagIdArray := make([]uint64, 0)
  40. for _, labelId := range req.Labels {
  41. tagIdArray = append(tagIdArray, labelId)
  42. if labelId == uint64(0) { //全部
  43. allContact = true
  44. }
  45. }
  46. startTime := time.Now()
  47. sendNow := true
  48. // 把 req.Msg 字符串的内容 json_decode 到 msgArray
  49. // 然后再把每一条信息都给所有指定的用户记录到 message_records 表
  50. var msgArray []custom_types.Action
  51. if req.Msg != nil && *req.Msg != "" {
  52. err = json.Unmarshal([]byte(*req.Msg), &msgArray)
  53. if err != nil {
  54. return nil, errors.New("解析JSON失败")
  55. }
  56. }
  57. var msgActionList []custom_types.Action
  58. if len(msgArray) > 0 {
  59. msgActionList = make([]custom_types.Action, len(msgArray))
  60. for i, msg := range msgArray {
  61. if msg.Type == 1 {
  62. msgActionList[i] = custom_types.Action{
  63. Type: msg.Type,
  64. Content: msg.Content,
  65. }
  66. } else {
  67. msgActionList[i] = custom_types.Action{
  68. Type: msg.Type,
  69. Content: msg.Content,
  70. Meta: &custom_types.Meta{
  71. Filename: msg.Meta.Filename,
  72. },
  73. }
  74. }
  75. }
  76. }
  77. userList := make([]*ent.Contact, 0)
  78. tagMap := make(map[string][]uint64)
  79. if allContact {
  80. // 获取 contact 表中 cc+phone 匹配的联系人
  81. userList, err = l.svcCtx.DB.Contact.Query().Where(
  82. contact.OrganizationID(organizationId),
  83. contact.Ctype(2),
  84. ).All(l.ctx)
  85. if err != nil {
  86. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  87. }
  88. tagids := make([]uint64, 0, 1)
  89. tagids = append(tagids, uint64(0))
  90. tagMap["contact_tag"] = tagids
  91. tagByte, err := json.Marshal(tagMap)
  92. if err != nil {
  93. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  94. }
  95. tagstring = string(tagByte)
  96. tag = "全部"
  97. } else {
  98. if allContact { // 所有联系人
  99. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 的 type 为1的数据
  100. userList, err = l.svcCtx.DB.Contact.Query().Where(
  101. contact.OrganizationID(organizationId),
  102. contact.Ctype(2),
  103. ).All(l.ctx)
  104. if err != nil {
  105. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  106. }
  107. tag = "全部联系人"
  108. } else {
  109. userList, err = l.getContactList(req.Labels, *req.Cc, *req.Phone)
  110. if err != nil {
  111. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  112. }
  113. }
  114. tagMap["contact_tag"] = req.Labels
  115. tagByte, err := json.Marshal(tagMap)
  116. if err != nil {
  117. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  118. }
  119. tagstring = string(tagByte)
  120. tagArray := l.getLabelListByIds(req.Labels, req.GroupLabels)
  121. if tag != "" {
  122. tagArray = append(tagArray, tag)
  123. }
  124. tag = strings.Join(tagArray, ",")
  125. }
  126. // 这里是根据userlist 和 批量消息数 获得最终待发送消息总数
  127. total := int32(len(userList)) * int32(len(msgActionList))
  128. l.Logger.Infof("userList=%v user_total=%v\n", userList, total)
  129. if total == 0 {
  130. return &types.BaseMsgResp{Msg: "未查询到收信人,请重新选择", Code: 3}, nil
  131. }
  132. // 开启事务
  133. tx, err := l.svcCtx.DB.Tx(context.Background())
  134. batchNo := uuidx.NewUUID().String()
  135. var sendTime *time.Time
  136. if !sendNow {
  137. sendTime = &startTime
  138. }
  139. _, err = tx.BatchMsg.Create().
  140. SetNotNilBatchNo(&batchNo).
  141. SetStatus(0).
  142. //SetNotNilFromwxid(req.Fromwxid).
  143. SetNotNilMsg(req.Msg).
  144. SetNotNilTag(&tag).
  145. SetNotNilPhone(req.Phone).
  146. SetNotNilCc(req.Cc).
  147. SetNotNilTagids(&tagstring).
  148. SetTotal(total).
  149. SetNotNilTaskName(req.TaskName).
  150. SetNotNilStartTime(&startTime).
  151. SetNillableSendTime(sendTime).
  152. SetType(3).
  153. SetNotNilOrganizationID(&organizationId).
  154. SetCtype(2).
  155. Save(l.ctx)
  156. if err != nil {
  157. _ = tx.Rollback()
  158. l.Logger.Errorf("insert whatsapp batch_msg err=%v\n", err)
  159. if err != nil {
  160. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  161. }
  162. }
  163. userSegment := make([]*ent.Contact, 0, l.MaxNumber)
  164. for _, user := range userList {
  165. userSegment = append(userSegment, user)
  166. if len(userSegment) == l.MaxNumber {
  167. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  168. if err != nil {
  169. _ = tx.Rollback()
  170. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  171. }
  172. }
  173. userSegment = make([]*ent.Contact, 0, l.MaxNumber)
  174. }
  175. if len(userSegment) > 0 {
  176. err = l.addBatchMsg(userSegment, tx, req, batchNo)
  177. if err != nil {
  178. _ = tx.Rollback()
  179. return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
  180. }
  181. }
  182. _ = tx.Commit()
  183. return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
  184. }
  185. func (l *CreateWhatcappBatchMsgLogic) getContactList(labels []uint64, cc, phone string) ([]*ent.Contact, error) {
  186. // 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
  187. labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labels...)).All(l.ctx)
  188. if err != nil {
  189. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  190. }
  191. contact_ids := make([]uint64, 0, len(labelrelationships))
  192. for _, labelrelationship := range labelrelationships {
  193. contact_ids = append(contact_ids, labelrelationship.ContactID)
  194. }
  195. userList := make([]*ent.Contact, 0)
  196. if len(contact_ids) > 0 {
  197. // 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
  198. userList, err = l.svcCtx.DB.Contact.Query().Where(
  199. contact.Cc(cc),
  200. contact.Phone(phone),
  201. contact.IDIn(contact_ids...),
  202. contact.Ctype(2),
  203. ).All(l.ctx)
  204. if err != nil {
  205. return nil, dberrorhandler.DefaultEntError(l.Logger, err, nil)
  206. }
  207. }
  208. return userList, nil
  209. }
  210. func (l *CreateWhatcappBatchMsgLogic) getLabelListByIds(labels, groupLabels []uint64) []string {
  211. result := make([]string, 0)
  212. labels = append(labels, groupLabels...)
  213. if len(labels) > 0 {
  214. contacts, err := l.svcCtx.DB.Label.Query().Where(
  215. label.IDIn(labels...),
  216. ).Select("name").All(l.ctx)
  217. l.Logger.Infof("contacts=%v", contacts)
  218. if err != nil {
  219. return result
  220. }
  221. for _, val := range contacts {
  222. result = append(result, val.Name)
  223. }
  224. }
  225. return result
  226. }
  227. func (l *CreateWhatcappBatchMsgLogic) addBatchMsg(userList []*ent.Contact, tx *ent.Tx, req *types.BatchMsgInfo, batchNo string) error {
  228. // 发送批量消息
  229. sto := make([]string, 0, 100)
  230. for _, v := range userList {
  231. sto = append(sto, v.Cc+v.Phone)
  232. }
  233. fromPhone := *req.Cc + *req.Phone
  234. result, err := aliyun.SendBatchChatappMessage(*req.TemplateCode, *req.Lang, fromPhone, sto)
  235. l.Logger.Infof("send_batch_chatapp_message result=%v\n", result)
  236. var status uint8 = 1
  237. if err != nil {
  238. status = 2
  239. }
  240. msgs := make([]*ent.MsgCreate, 0, l.MaxNumber)
  241. for _, user := range userList {
  242. phone := user.Cc + user.Phone
  243. msgRow := tx.Msg.Create().
  244. SetNotNilCc(req.Cc).
  245. SetNotNilPhone(req.Phone).
  246. SetNotNilToid(&phone).
  247. SetMsgtype(int32(1)).
  248. //SetNotNilMsg(req.Msg).
  249. SetStatus(status).
  250. SetNotNilBatchNo(&batchNo)
  251. msgs = append(msgs, msgRow)
  252. }
  253. _, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
  254. if err != nil {
  255. l.Logger.Errorf("msg CreateBulk err: %v", err)
  256. return err
  257. }
  258. return nil
  259. }