send_wx.go 5.7 KB


  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "wechat-api/ent"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/messagerecords"
  12. "wechat-api/ent/server"
  13. "wechat-api/ent/wx"
  14. "wechat-api/hook"
  15. "wechat-api/internal/types/payload"
  16. )
  17. func (l *CronTask) sendWx() {
  18. ctx := context.Background()
  19. config :=
  20. `{
  21. "ip": "172.18.41.219",
  22. "number": 10,
  23. "wx_list": [
  24. {
  25. "port": "30001",
  26. "wxid": "wxid_77au928zeb2p12",
  27. "nickname": "百事通"
  28. },
  29. {
  30. "port": "30007",
  31. "wxid": "wxid_edc0mvp188ms22",
  32. "nickname": "爱博闻"
  33. },
  34. {
  35. "port": "30040",
  36. "wxid": "wxid_wupstwe851nb12",
  37. "nickname": "轻马小助手"
  38. }
  39. ]
  40. }`
  41. var p payload.SendWxPayload
  42. if err := json.Unmarshal([]byte(config), &p); err != nil {
  43. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  44. return
  45. }
  46. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  47. messages, err := l.svcCtx.DB.MessageRecords.Query().
  48. Where(messagerecords.StatusIn(1, 4)).
  49. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  50. Limit(p.Number).
  51. All(ctx)
  52. if err != nil {
  53. l.Logger.Errorf("get messageRecords list failed %v", err)
  54. return
  55. }
  56. startTime := time.Now()
  57. type SendTextMsgReq struct {
  58. Wxid string `json:"wxid"`
  59. Msg string `json:"msg"`
  60. }
  61. type SendFileMsgReq struct {
  62. Wxid string `json:"wxid"`
  63. Filepath string `json:"filepath"`
  64. Diyfilename string `json:"diyfilename"`
  65. }
  66. // 从 Redis 中获取服务器信息(ip, port)
  67. getServerInfo := func(wxid string) (string, error) {
  68. key := "crontask_wx_server_info"
  69. val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  70. //if err != nil {
  71. // return "", err
  72. //}
  73. if val == "" {
  74. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  75. if err != nil {
  76. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  77. return "", err
  78. }
  79. if wx.ServerID != 0 {
  80. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  81. if err != nil {
  82. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  83. return "", err
  84. }
  85. ctype := strconv.Itoa(int(wx.Ctype))
  86. val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port + ":" + ctype
  87. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  88. }
  89. }
  90. return val, nil
  91. }
  92. // 从 Redis 中获取服务器信息(ip, port)
  93. getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
  94. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  95. contact.WxWxidEQ(botwxid),
  96. contact.WxidEQ(wxid),
  97. contact.CtypeIn(1, 3),
  98. ).First(l.ctx)
  99. return contactInfo, nil
  100. }
  101. for _, v := range messages {
  102. // 更新 status 值为 2(发送中)
  103. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  104. if v.Content != "" {
  105. serverInfo, _ := getServerInfo(v.BotWxid)
  106. serverIp := ""
  107. adminPort := ""
  108. wxPort := ""
  109. ctype := ""
  110. if serverInfo != "" {
  111. infoArray := strings.Split(serverInfo, ":")
  112. serverIp, adminPort, wxPort, ctype = infoArray[0], infoArray[1], infoArray[2], infoArray[3]
  113. }
  114. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  115. if err != nil {
  116. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  117. continue
  118. }
  119. var hookClient *hook.Hook
  120. if ctype == "3" { //企微
  121. hookClient = hook.NewWecomHook("", adminPort, wxPort)
  122. } else { //个微
  123. hookClient = hook.NewHook(serverIp, adminPort, wxPort)
  124. }
  125. if v.ContentType == 1 {
  126. content := v.Content
  127. if containsPlaceholder(content) {
  128. contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
  129. content = varReplace(content, contactInfo)
  130. }
  131. err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid)
  132. } else {
  133. re := regexp.MustCompile(`[^/]+$`)
  134. fileName := re.FindString(v.Content)
  135. err = hookClient.SendPicMsg(v.ContactWxid, v.Content, fileName, v.BotWxid)
  136. }
  137. if err != nil {
  138. _ = tx.Rollback()
  139. continue
  140. } else {
  141. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  142. if err != nil {
  143. _ = tx.Rollback()
  144. } else {
  145. _ = tx.Commit()
  146. }
  147. }
  148. time.Sleep(time.Duration(60/p.Number) * time.Second)
  149. } else {
  150. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  151. if err != nil {
  152. _ = tx.Rollback()
  153. } else {
  154. _ = tx.Commit()
  155. }
  156. }
  157. }
  158. finishTime := time.Now()
  159. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  160. return
  161. }
  162. func containsPlaceholder(s string) bool {
  163. pattern := `\$\{.*?\}`
  164. matched, _ := regexp.MatchString(pattern, s)
  165. return matched
  166. }
  167. func varReplace(s string, contactInfo *ent.Contact) string {
  168. nickname := ""
  169. var cname, carea, cbirthday, cbirtharea, cidcard_no, ctitle, sex, cage string
  170. if contactInfo != nil {
  171. nickname = contactInfo.Nickname
  172. carea = contactInfo.Carea
  173. cname = contactInfo.Cname
  174. cbirthday = contactInfo.Cbirthday
  175. cbirtharea = contactInfo.Cbirtharea
  176. cidcard_no = contactInfo.CidcardNo
  177. ctitle = contactInfo.Ctitle
  178. if contactInfo.Sex == 1 {
  179. sex = "男"
  180. } else if contactInfo.Sex == 2 {
  181. sex = "女"
  182. }
  183. age := contactInfo.Cage
  184. if age > 0 {
  185. cage = strconv.Itoa(age)
  186. }
  187. }
  188. s = strings.Replace(s, "${nickname}", nickname, -1)
  189. s = strings.Replace(s, "${cname}", cname, -1)
  190. s = strings.Replace(s, "${carea}", carea, -1)
  191. s = strings.Replace(s, "${cbirthday}", cbirthday, -1)
  192. s = strings.Replace(s, "${cbirtharea}", cbirtharea, -1)
  193. s = strings.Replace(s, "${cidcard_no}", cidcard_no, -1)
  194. s = strings.Replace(s, "${ctitle}", ctitle, -1)
  195. s = strings.Replace(s, "${sex}", sex, -1)
  196. s = strings.Replace(s, "${cage}", cage, -1)
  197. return s
  198. }