send_wx.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package crontask
  2. import (
  3. "context"
  4. "encoding/json"
  5. "github.com/imroc/req/v3"
  6. "strings"
  7. "time"
  8. "wechat-api/ent"
  9. "wechat-api/ent/messagerecords"
  10. "wechat-api/ent/server"
  11. "wechat-api/ent/wx"
  12. "wechat-api/internal/types/payload"
  13. )
  14. func (l *CronTask) sendWx() {
  15. ctx := context.Background()
  16. config :=
  17. `{
  18. "ip": "172.18.41.219",
  19. "number": 10,
  20. "wx_list": [
  21. {
  22. "port": "30001",
  23. "wxid": "wxid_77au928zeb2p12",
  24. "nickname": "百事通"
  25. },
  26. {
  27. "port": "30007",
  28. "wxid": "wxid_edc0mvp188ms22",
  29. "nickname": "爱博闻"
  30. },
  31. {
  32. "port": "30040",
  33. "wxid": "wxid_wupstwe851nb12",
  34. "nickname": "轻马小助手"
  35. }
  36. ]
  37. }`
  38. var p payload.SendWxPayload
  39. if err := json.Unmarshal([]byte(config), &p); err != nil {
  40. l.Logger.Errorf("failed to unmarshal the payload :%v", err)
  41. return
  42. }
  43. // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
  44. messages, err := l.svcCtx.DB.MessageRecords.Query().
  45. Where(messagerecords.StatusIn(1, 4)).
  46. Order(ent.Asc(messagerecords.FieldCreatedAt)).
  47. Limit(p.Number).
  48. All(ctx)
  49. if err != nil {
  50. l.Logger.Errorf("get messageRecords list failed %v", err)
  51. return
  52. }
  53. startTime := time.Now()
  54. client := req.C().DevMode()
  55. client.SetCommonRetryCount(2).
  56. SetCommonRetryBackoffInterval(1*time.Second, 5*time.Second).
  57. SetCommonRetryFixedInterval(2 * time.Second).SetTimeout(30 * time.Second)
  58. type SendTextMsgReq struct {
  59. Wxid string `json:"wxid"`
  60. Msg string `json:"msg"`
  61. }
  62. type SendFileMsgReq struct {
  63. Wxid string `json:"wxid"`
  64. Filepath string `json:"filepath"`
  65. Diyfilename string `json:"diyfilename"`
  66. }
  67. // 从 Redis 中获取服务器信息(ip, port)
  68. getServerInfo := func(wxid string) (string, error) {
  69. key := "crontask_wx_server_info"
  70. val, err := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
  71. if err != nil {
  72. return "", err
  73. }
  74. if val == "" {
  75. wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
  76. if err != nil {
  77. l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
  78. return "", err
  79. }
  80. server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
  81. if err != nil {
  82. l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
  83. return "", err
  84. }
  85. val = server.PrivateIP + ":" + server.AdminPort
  86. l.svcCtx.Rds.HSet(ctx, key, wxid, val)
  87. }
  88. return val, nil
  89. }
  90. for _, v := range messages {
  91. serverInfo, _ := getServerInfo(v.BotWxid)
  92. infoArray := strings.Split(serverInfo, ":")
  93. ip, wxPort := infoArray[0], infoArray[1]
  94. // 更新 status 值为 2(发送中)
  95. tx, err := l.svcCtx.DB.Tx(l.ctx)
  96. _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
  97. if err != nil {
  98. l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
  99. continue
  100. }
  101. if v.ContentType == 1 {
  102. _, err = client.R().SetBody(&SendTextMsgReq{
  103. Wxid: v.ContactWxid,
  104. Msg: v.Content,
  105. }).Post("http://" + ip + ":" + wxPort + "/SendTextMsg")
  106. } else {
  107. _, err = client.R().SetBody(&SendFileMsgReq{
  108. Wxid: v.ContactWxid,
  109. Filepath: v.Content,
  110. Diyfilename: v.Meta.Filename,
  111. }).Post("http://" + ip + ":" + wxPort + "/SendFileMsg")
  112. }
  113. if err != nil {
  114. _ = tx.Rollback()
  115. continue
  116. } else {
  117. _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
  118. if err != nil {
  119. _ = tx.Rollback()
  120. } else {
  121. _ = tx.Commit()
  122. }
  123. }
  124. time.Sleep(time.Duration(60/p.Number) * time.Second)
  125. }
  126. finishTime := time.Now()
  127. l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
  128. return
  129. }