send_wx.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "regexp"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "wechat-api/ent"
  12. "wechat-api/ent/contact"
  13. "wechat-api/ent/messagerecords"
  14. "wechat-api/ent/server"
  15. "wechat-api/ent/wx"
  16. "wechat-api/hook"
  17. "wechat-api/internal/types/payload"
  18. )
  19. func (l *CronTask) sendWx() {
  20. ctx := context.Background()
  21. config :=
  22. `{
  23. "ip": "172.18.41.219",
  24. "number": 10,
  25. "wx_list": [
  26. {
  27. "port": "30001",
  28. "wxid": "wxid_77au928zeb2p12",
  29. "nickname": "百事通"
  30. },
  31. {
  32. "port": "30007",
  33. "wxid": "wxid_edc0mvp188ms22",
  34. "nickname": "爱博闻"
  35. },
  36. {
  37. "port": "30040",
  38. "wxid": "wxid_wupstwe851nb12",
  39. "nickname": "轻马小助手"
  40. }
  41. ]
  42. }`
  43. var p payload.SendWxPayload
  44. if err := json.Unmarshal([]byte(config), &p); err != nil {
  45. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  46. return
  47. }
  48. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  49. messages, err := l.svcCtx.DB.MessageRecords.Query().
  50. Where(messagerecords.StatusIn(1, 4)).
  51. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  52. Limit(p.Number).
  53. All(ctx)
  54. if err != nil {
  55. l.Logger.Errorf("get messageRecords list failed %v", err)
  56. return
  57. }
  58. startTime := time.Now()
  59. type SendTextMsgReq struct {
  60. Wxid string `json:"wxid"`
  61. Msg string `json:"msg"`
  62. }
  63. type SendFileMsgReq struct {
  64. Wxid string `json:"wxid"`
  65. Filepath string `json:"filepath"`
  66. Diyfilename string `json:"diyfilename"`
  67. }
  68. // 从 Redis 中获取服务器信息(ip, port)
  69. getServerInfo := func(wxid string) (string, error) {
  70. key := "crontask_wx_server_info"
  71. val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  72. if val == "" {
  73. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  74. if err != nil {
  75. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  76. return "", err
  77. }
  78. if wx.ServerID != 0 {
  79. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  80. if err != nil {
  81. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  82. return "", err
  83. }
  84. ctype := strconv.Itoa(int(wx.Ctype))
  85. val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port + ":" + ctype
  86. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  87. } else {
  88. val = "0:0" + ":" + wx.Port + ":" + strconv.Itoa(int(wx.Ctype))
  89. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  90. }
  91. }
  92. return val, nil
  93. }
  94. // 从 Redis 中获取服务器信息(ip, port)
  95. getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
  96. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  97. contact.WxWxidEQ(botwxid),
  98. contact.WxidEQ(wxid),
  99. contact.CtypeIn(1, 3),
  100. ).First(l.ctx)
  101. return contactInfo, nil
  102. }
  103. for _, v := range messages {
  104. // 更新 status 值为 2(发送中)
  105. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  106. if v.Content != "" {
  107. serverInfo, _ := getServerInfo(v.BotWxid)
  108. serverIp := ""
  109. adminPort := ""
  110. wxPort := ""
  111. ctype := ""
  112. if serverInfo != "" {
  113. infoArray := strings.Split(serverInfo, ":")
  114. serverIp, adminPort, wxPort, ctype = infoArray[0], infoArray[1], infoArray[2], infoArray[3]
  115. }
  116. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  117. if err != nil {
  118. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  119. continue
  120. }
  121. var hookClient *hook.Hook
  122. if ctype == "3" { //企微
  123. hookClient = hook.NewWecomHook("", adminPort, wxPort)
  124. } else { //个微
  125. hookClient = hook.NewHook(serverIp, adminPort, wxPort)
  126. }
  127. if v.ContentType == 1 {
  128. content := v.Content
  129. if containsPlaceholder(content) {
  130. contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
  131. content = varReplace(content, contactInfo)
  132. }
  133. msgId := time.Now().UnixNano() / int64(time.Microsecond)
  134. l.svcCtx.Rds.Set(l.ctx, fmt.Sprintf("MsgId_FriendId:%d", msgId), v.ContactWxid, 10*time.Minute)
  135. err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid, msgId)
  136. } else {
  137. content := v.Content
  138. re := regexp.MustCompile(`[^/]+$`)
  139. fileName := re.FindString(content)
  140. logx.Info("SendPicMsg content: ", content)
  141. logx.Info("SendPicMsg fileName: ", fileName)
  142. err = hookClient.SendPicMsg(v.ContactWxid, content, fileName, v.BotWxid)
  143. }
  144. if err != nil {
  145. _ = tx.Rollback()
  146. continue
  147. } else {
  148. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  149. if err != nil {
  150. _ = tx.Rollback()
  151. } else {
  152. _ = tx.Commit()
  153. }
  154. }
  155. time.Sleep(time.Duration(60/p.Number) * time.Second)
  156. } else {
  157. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  158. if err != nil {
  159. _ = tx.Rollback()
  160. } else {
  161. _ = tx.Commit()
  162. }
  163. }
  164. }
  165. finishTime := time.Now()
  166. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  167. return
  168. }
  169. func containsPlaceholder(s string) bool {
  170. pattern := `\$\{.*?\}`
  171. matched, _ := regexp.MatchString(pattern, s)
  172. return matched
  173. }
  174. func varReplace(s string, contactInfo *ent.Contact) string {
  175. nickname := ""
  176. var cname, carea, cbirthday, cbirtharea, cidcard_no, ctitle, sex, cage string
  177. if contactInfo != nil {
  178. nickname = contactInfo.Nickname
  179. carea = contactInfo.Carea
  180. cname = contactInfo.Cname
  181. cbirthday = contactInfo.Cbirthday
  182. cbirtharea = contactInfo.Cbirtharea
  183. cidcard_no = contactInfo.CidcardNo
  184. ctitle = contactInfo.Ctitle
  185. if contactInfo.Sex == 1 {
  186. sex = "男"
  187. } else if contactInfo.Sex == 2 {
  188. sex = "女"
  189. }
  190. age := contactInfo.Cage
  191. if age > 0 {
  192. cage = strconv.Itoa(age)
  193. }
  194. }
  195. s = strings.Replace(s, "${nickname}", nickname, -1)
  196. s = strings.Replace(s, "${cname}", cname, -1)
  197. s = strings.Replace(s, "${carea}", carea, -1)
  198. s = strings.Replace(s, "${cbirthday}", cbirthday, -1)
  199. s = strings.Replace(s, "${cbirtharea}", cbirtharea, -1)
  200. s = strings.Replace(s, "${cidcard_no}", cidcard_no, -1)
  201. s = strings.Replace(s, "${ctitle}", ctitle, -1)
  202. s = strings.Replace(s, "${sex}", sex, -1)
  203. s = strings.Replace(s, "${cage}", cage, -1)
  204. return s
  205. }