send_wx.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "regexp"
  7. "strconv"
  8. "strings"
  9. "time"
  10. "wechat-api/ent"
  11. "wechat-api/ent/contact"
  12. "wechat-api/ent/messagerecords"
  13. "wechat-api/ent/server"
  14. "wechat-api/ent/wx"
  15. "wechat-api/hook"
  16. "wechat-api/internal/types/payload"
  17. )
  18. func (l *CronTask) sendWx() {
  19. ctx := context.Background()
  20. config :=
  21. `{
  22. "ip": "172.18.41.219",
  23. "number": 10,
  24. "wx_list": [
  25. {
  26. "port": "30001",
  27. "wxid": "wxid_77au928zeb2p12",
  28. "nickname": "百事通"
  29. },
  30. {
  31. "port": "30007",
  32. "wxid": "wxid_edc0mvp188ms22",
  33. "nickname": "爱博闻"
  34. },
  35. {
  36. "port": "30040",
  37. "wxid": "wxid_wupstwe851nb12",
  38. "nickname": "轻马小助手"
  39. }
  40. ]
  41. }`
  42. var p payload.SendWxPayload
  43. if err := json.Unmarshal([]byte(config), &p); err != nil {
  44. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  45. return
  46. }
  47. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  48. messages, err := l.svcCtx.DB.MessageRecords.Query().
  49. Where(messagerecords.StatusIn(1, 4)).
  50. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  51. Limit(p.Number).
  52. All(ctx)
  53. if err != nil {
  54. l.Logger.Errorf("get messageRecords list failed %v", err)
  55. return
  56. }
  57. startTime := time.Now()
  58. type SendTextMsgReq struct {
  59. Wxid string `json:"wxid"`
  60. Msg string `json:"msg"`
  61. }
  62. type SendFileMsgReq struct {
  63. Wxid string `json:"wxid"`
  64. Filepath string `json:"filepath"`
  65. Diyfilename string `json:"diyfilename"`
  66. }
  67. // 从 Redis 中获取服务器信息(ip, port)
  68. getServerInfo := func(wxid string) (string, error) {
  69. key := "crontask_wx_server_info"
  70. val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  71. if val == "" {
  72. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  73. if err != nil {
  74. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  75. return "", err
  76. }
  77. if wx.ServerID != 0 {
  78. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  79. if err != nil {
  80. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  81. return "", err
  82. }
  83. ctype := strconv.Itoa(int(wx.Ctype))
  84. val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port + ":" + ctype
  85. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  86. } else {
  87. val = "0:0" + ":" + wx.Port + ":" + strconv.Itoa(int(wx.Ctype))
  88. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  89. }
  90. }
  91. return val, nil
  92. }
  93. // 从 Redis 中获取服务器信息(ip, port)
  94. getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
  95. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  96. contact.WxWxidEQ(botwxid),
  97. contact.WxidEQ(wxid),
  98. contact.CtypeIn(1, 3),
  99. ).First(l.ctx)
  100. return contactInfo, nil
  101. }
  102. for _, v := range messages {
  103. // 更新 status 值为 2(发送中)
  104. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  105. if v.Content != "" {
  106. serverInfo, _ := getServerInfo(v.BotWxid)
  107. serverIp := ""
  108. adminPort := ""
  109. wxPort := ""
  110. ctype := ""
  111. if serverInfo != "" {
  112. infoArray := strings.Split(serverInfo, ":")
  113. serverIp, adminPort, wxPort, ctype = infoArray[0], infoArray[1], infoArray[2], infoArray[3]
  114. }
  115. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  116. if err != nil {
  117. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  118. continue
  119. }
  120. var hookClient *hook.Hook
  121. if ctype == "3" { //企微
  122. hookClient = hook.NewWecomHook("", adminPort, wxPort)
  123. } else { //个微
  124. hookClient = hook.NewHook(serverIp, adminPort, wxPort)
  125. }
  126. if v.ContentType == 1 {
  127. content := v.Content
  128. if containsPlaceholder(content) {
  129. contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
  130. content = varReplace(content, contactInfo)
  131. }
  132. msgId := time.Now().UnixNano() / int64(time.Microsecond)
  133. l.svcCtx.Rds.Set(l.ctx, fmt.Sprintf("MsgId_FriendId:%d", msgId), v.ContactWxid, 10*time.Minute)
  134. err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid, msgId)
  135. } else {
  136. re := regexp.MustCompile(`[^/]+$`)
  137. fileName := re.FindString(v.Content)
  138. err = hookClient.SendPicMsg(v.ContactWxid, v.Content, fileName, v.BotWxid)
  139. }
  140. if err != nil {
  141. _ = tx.Rollback()
  142. continue
  143. } else {
  144. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  145. if err != nil {
  146. _ = tx.Rollback()
  147. } else {
  148. _ = tx.Commit()
  149. }
  150. }
  151. time.Sleep(time.Duration(60/p.Number) * time.Second)
  152. } else {
  153. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  154. if err != nil {
  155. _ = tx.Rollback()
  156. } else {
  157. _ = tx.Commit()
  158. }
  159. }
  160. }
  161. finishTime := time.Now()
  162. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  163. return
  164. }
  165. func containsPlaceholder(s string) bool {
  166. pattern := `\$\{.*?\}`
  167. matched, _ := regexp.MatchString(pattern, s)
  168. return matched
  169. }
  170. func varReplace(s string, contactInfo *ent.Contact) string {
  171. nickname := ""
  172. var cname, carea, cbirthday, cbirtharea, cidcard_no, ctitle, sex, cage string
  173. if contactInfo != nil {
  174. nickname = contactInfo.Nickname
  175. carea = contactInfo.Carea
  176. cname = contactInfo.Cname
  177. cbirthday = contactInfo.Cbirthday
  178. cbirtharea = contactInfo.Cbirtharea
  179. cidcard_no = contactInfo.CidcardNo
  180. ctitle = contactInfo.Ctitle
  181. if contactInfo.Sex == 1 {
  182. sex = "男"
  183. } else if contactInfo.Sex == 2 {
  184. sex = "女"
  185. }
  186. age := contactInfo.Cage
  187. if age > 0 {
  188. cage = strconv.Itoa(age)
  189. }
  190. }
  191. s = strings.Replace(s, "${nickname}", nickname, -1)
  192. s = strings.Replace(s, "${cname}", cname, -1)
  193. s = strings.Replace(s, "${carea}", carea, -1)
  194. s = strings.Replace(s, "${cbirthday}", cbirthday, -1)
  195. s = strings.Replace(s, "${cbirtharea}", cbirtharea, -1)
  196. s = strings.Replace(s, "${cidcard_no}", cidcard_no, -1)
  197. s = strings.Replace(s, "${ctitle}", ctitle, -1)
  198. s = strings.Replace(s, "${sex}", sex, -1)
  199. s = strings.Replace(s, "${cage}", cage, -1)
  200. return s
  201. }