send_wx.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "wechat-api/ent"
  10. "wechat-api/ent/contact"
  11. "wechat-api/ent/messagerecords"
  12. "wechat-api/ent/server"
  13. "wechat-api/ent/wx"
  14. "wechat-api/hook"
  15. "wechat-api/internal/types/payload"
  16. )
  17. func (l *CronTask) sendWx() {
  18. ctx := context.Background()
  19. config :=
  20. `{
  21. "ip": "172.18.41.219",
  22. "number": 10,
  23. "wx_list": [
  24. {
  25. "port": "30001",
  26. "wxid": "wxid_77au928zeb2p12",
  27. "nickname": "百事通"
  28. },
  29. {
  30. "port": "30007",
  31. "wxid": "wxid_edc0mvp188ms22",
  32. "nickname": "爱博闻"
  33. },
  34. {
  35. "port": "30040",
  36. "wxid": "wxid_wupstwe851nb12",
  37. "nickname": "轻马小助手"
  38. }
  39. ]
  40. }`
  41. var p payload.SendWxPayload
  42. if err := json.Unmarshal([]byte(config), &p); err != nil {
  43. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  44. return
  45. }
  46. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  47. messages, err := l.svcCtx.DB.MessageRecords.Query().
  48. Where(messagerecords.StatusIn(1, 4)).
  49. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  50. Limit(p.Number).
  51. All(ctx)
  52. if err != nil {
  53. l.Logger.Errorf("get messageRecords list failed %v", err)
  54. return
  55. }
  56. startTime := time.Now()
  57. type SendTextMsgReq struct {
  58. Wxid string `json:"wxid"`
  59. Msg string `json:"msg"`
  60. }
  61. type SendFileMsgReq struct {
  62. Wxid string `json:"wxid"`
  63. Filepath string `json:"filepath"`
  64. Diyfilename string `json:"diyfilename"`
  65. }
  66. // 从 Redis 中获取服务器信息(ip, port)
  67. getServerInfo := func(wxid string) (string, error) {
  68. key := "crontask_wx_server_info"
  69. val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  70. //if err != nil {
  71. // return "", err
  72. //}
  73. if val == "" {
  74. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  75. if err != nil {
  76. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  77. return "", err
  78. }
  79. if wx.ServerID != 0 {
  80. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  81. if err != nil {
  82. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  83. return "", err
  84. }
  85. val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port
  86. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  87. }
  88. }
  89. return val, nil
  90. }
  91. // 从 Redis 中获取服务器信息(ip, port)
  92. getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
  93. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
  94. contact.WxWxidEQ(botwxid),
  95. contact.WxidEQ(wxid),
  96. contact.Ctype(1),
  97. ).First(l.ctx)
  98. return contactInfo, nil
  99. }
  100. for _, v := range messages {
  101. // 更新 status 值为 2(发送中)
  102. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  103. if v.Content != "" {
  104. serverInfo, _ := getServerInfo(v.BotWxid)
  105. serverIp := ""
  106. adminPort := ""
  107. wxPort := ""
  108. if serverInfo != "" {
  109. infoArray := strings.Split(serverInfo, ":")
  110. serverIp, adminPort, wxPort = infoArray[0], infoArray[1], infoArray[2]
  111. }
  112. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  113. if err != nil {
  114. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  115. continue
  116. }
  117. hookClient := hook.NewHook(serverIp, adminPort, wxPort)
  118. if v.ContentType == 1 {
  119. content := v.Content
  120. if containsPlaceholder(content) {
  121. contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
  122. content = varReplace(content, contactInfo)
  123. }
  124. err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid)
  125. } else {
  126. re := regexp.MustCompile(`[^/]+$`)
  127. fileName := re.FindString(v.Content)
  128. err = hookClient.SendPicMsg(v.ContactWxid, v.Content, fileName, v.BotWxid)
  129. }
  130. if err != nil {
  131. _ = tx.Rollback()
  132. continue
  133. } else {
  134. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  135. if err != nil {
  136. _ = tx.Rollback()
  137. } else {
  138. _ = tx.Commit()
  139. }
  140. }
  141. time.Sleep(time.Duration(60/p.Number) * time.Second)
  142. } else {
  143. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  144. if err != nil {
  145. _ = tx.Rollback()
  146. } else {
  147. _ = tx.Commit()
  148. }
  149. }
  150. }
  151. finishTime := time.Now()
  152. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  153. return
  154. }
  155. func containsPlaceholder(s string) bool {
  156. pattern := `\$\{.*?\}`
  157. matched, _ := regexp.MatchString(pattern, s)
  158. return matched
  159. }
  160. func varReplace(s string, contactInfo *ent.Contact) string {
  161. nickname := ""
  162. var cname, carea, cbirthday, cbirtharea, cidcard_no, ctitle, sex, cage string
  163. if contactInfo != nil {
  164. nickname = contactInfo.Nickname
  165. carea = contactInfo.Carea
  166. cname = contactInfo.Cname
  167. cbirthday = contactInfo.Cbirthday
  168. cbirtharea = contactInfo.Cbirtharea
  169. cidcard_no = contactInfo.CidcardNo
  170. ctitle = contactInfo.Ctitle
  171. if contactInfo.Sex == 1 {
  172. sex = "男"
  173. } else if contactInfo.Sex == 2 {
  174. sex = "女"
  175. }
  176. age := contactInfo.Cage
  177. if age > 0 {
  178. cage = strconv.Itoa(age)
  179. }
  180. }
  181. s = strings.Replace(s, "${nickname}", nickname, -1)
  182. s = strings.Replace(s, "${cname}", cname, -1)
  183. s = strings.Replace(s, "${carea}", carea, -1)
  184. s = strings.Replace(s, "${cbirthday}", cbirthday, -1)
  185. s = strings.Replace(s, "${cbirtharea}", cbirtharea, -1)
  186. s = strings.Replace(s, "${cidcard_no}", cidcard_no, -1)
  187. s = strings.Replace(s, "${ctitle}", ctitle, -1)
  188. s = strings.Replace(s, "${sex}", sex, -1)
  189. s = strings.Replace(s, "${cage}", cage, -1)
  190. return s
  191. }