send_wx.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "regexp"
  6. "strings"
  7. "time"
  8. "wechat-api/ent"
  9. "wechat-api/ent/contact"
  10. "wechat-api/ent/messagerecords"
  11. "wechat-api/ent/server"
  12. "wechat-api/ent/wx"
  13. "wechat-api/hook"
  14. "wechat-api/internal/types/payload"
  15. )
  16. func (l *CronTask) sendWx() {
  17. ctx := context.Background()
  18. config :=
  19. `{
  20. "ip": "172.18.41.219",
  21. "number": 10,
  22. "wx_list": [
  23. {
  24. "port": "30001",
  25. "wxid": "wxid_77au928zeb2p12",
  26. "nickname": "百事通"
  27. },
  28. {
  29. "port": "30007",
  30. "wxid": "wxid_edc0mvp188ms22",
  31. "nickname": "爱博闻"
  32. },
  33. {
  34. "port": "30040",
  35. "wxid": "wxid_wupstwe851nb12",
  36. "nickname": "轻马小助手"
  37. }
  38. ]
  39. }`
  40. var p payload.SendWxPayload
  41. if err := json.Unmarshal([]byte(config), &p); err != nil {
  42. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  43. return
  44. }
  45. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  46. messages, err := l.svcCtx.DB.MessageRecords.Query().
  47. Where(messagerecords.StatusIn(1, 4)).
  48. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  49. Limit(p.Number).
  50. All(ctx)
  51. if err != nil {
  52. l.Logger.Errorf("get messageRecords list failed %v", err)
  53. return
  54. }
  55. startTime := time.Now()
  56. type SendTextMsgReq struct {
  57. Wxid string `json:"wxid"`
  58. Msg string `json:"msg"`
  59. }
  60. type SendFileMsgReq struct {
  61. Wxid string `json:"wxid"`
  62. Filepath string `json:"filepath"`
  63. Diyfilename string `json:"diyfilename"`
  64. }
  65. // 从 Redis 中获取服务器信息(ip, port)
  66. getServerInfo := func(wxid string) (string, error) {
  67. key := "crontask_wx_server_info"
  68. val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  69. //if err != nil {
  70. // return "", err
  71. //}
  72. if val == "" {
  73. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  74. if err != nil {
  75. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  76. return "", err
  77. }
  78. if wx.ServerID != 0 {
  79. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  80. if err != nil {
  81. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  82. return "", err
  83. }
  84. val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port
  85. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  86. }
  87. }
  88. return val, nil
  89. }
  90. // 从 Redis 中获取服务器信息(ip, port)
  91. getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
  92. contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(contact.WxWxidEQ(botwxid), contact.WxidEQ(wxid)).First(l.ctx)
  93. return contactInfo, nil
  94. }
  95. for _, v := range messages {
  96. // 更新 status 值为 2(发送中)
  97. tx, _ := l.svcCtx.DB.Tx(l.ctx)
  98. if v.Content != "" {
  99. serverInfo, _ := getServerInfo(v.BotWxid)
  100. serverIp := ""
  101. adminPort := ""
  102. wxPort := ""
  103. if serverInfo != "" {
  104. infoArray := strings.Split(serverInfo, ":")
  105. serverIp, adminPort, wxPort = infoArray[0], infoArray[1], infoArray[2]
  106. }
  107. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  108. if err != nil {
  109. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  110. continue
  111. }
  112. hookClient := hook.NewHook(serverIp, adminPort, wxPort)
  113. if v.ContentType == 1 {
  114. content := v.Content
  115. if containsPlaceholder(content) {
  116. contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
  117. content = varReplace(content, contactInfo)
  118. }
  119. err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid)
  120. } else {
  121. re := regexp.MustCompile(`[^/]+$`)
  122. fileName := re.FindString(v.Content)
  123. err = hookClient.SendPicMsg(v.ContactWxid, v.Content, fileName, v.BotWxid)
  124. }
  125. if err != nil {
  126. _ = tx.Rollback()
  127. continue
  128. } else {
  129. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  130. if err != nil {
  131. _ = tx.Rollback()
  132. } else {
  133. _ = tx.Commit()
  134. }
  135. }
  136. time.Sleep(time.Duration(60/p.Number) * time.Second)
  137. } else {
  138. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  139. if err != nil {
  140. _ = tx.Rollback()
  141. } else {
  142. _ = tx.Commit()
  143. }
  144. }
  145. }
  146. finishTime := time.Now()
  147. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  148. return
  149. }
  150. func containsPlaceholder(s string) bool {
  151. pattern := `\$\{.*?\}`
  152. matched, _ := regexp.MatchString(pattern, s)
  153. return matched
  154. }
  155. func varReplace(s string, contactInfo *ent.Contact) string {
  156. nickname := ""
  157. if contactInfo != nil {
  158. nickname = contactInfo.Nickname
  159. }
  160. s = strings.Replace(s, "${nickname}", nickname, -1)
  161. return s
  162. }