wx_add_friend.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package crontask
  2. import (
  3. "context"
  4. "fmt"
  5. "reflect"
  6. "time"
  7. _ "time/tzdata"
  8. "wechat-api/ent"
  9. "wechat-api/internal/service/addfriend"
  10. "wechat-api/internal/svc"
  11. "wechat-api/internal/utils/compapi"
  12. "github.com/zeromicro/go-zero/core/logx"
  13. )
  14. const (
  15. WX_FREQKEY = "WX_DAILY_FREQKEY"
  16. WX_DAILY_ADDCOUNT = 30
  17. )
  18. type wxAddFreqCtl struct {
  19. logx.Logger
  20. ctx context.Context
  21. svcCtx *svc.ServiceContext
  22. }
  23. func (me *wxAddFreqCtl) getDailyWxIDKey(wxID string) string {
  24. now, _ := getLocNowTime()
  25. dateStr := now.Format("20060102") // Go 的日期格式化字符串
  26. return fmt.Sprintf("%s:%s:%s", WX_FREQKEY, wxID, dateStr)
  27. }
  28. func getLocNowTime() (time.Time, *time.Location) {
  29. loc, err := time.LoadLocation("Asia/Shanghai")
  30. if err != nil {
  31. fmt.Println(err)
  32. }
  33. now := time.Now()
  34. if loc != nil {
  35. now = now.In(loc)
  36. }
  37. return now, loc
  38. }
  39. func (me *wxAddFreqCtl) Check(wxID string) (bool, int64, error) {
  40. key := me.getDailyWxIDKey(wxID)
  41. //me.svcCtx.Rds.Del(me.ctx, key)
  42. count, err := me.svcCtx.Rds.Incr(me.ctx, key).Result()
  43. if err != nil {
  44. return false, 0, err
  45. }
  46. //如果是当天的第一次发送 (count == 1),设置过期时间
  47. if count == 1 {
  48. // 计算到明天零点的秒数
  49. now, loc := getLocNowTime()
  50. tomorrow := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc)
  51. expiration := tomorrow.Sub(now)
  52. // 设置过期时间
  53. // 添加一个小的缓冲时间(例如 5 分钟),防止临界点问题
  54. expireCmd := me.svcCtx.Rds.Expire(me.ctx, key, expiration+5*time.Minute)
  55. if expireCmd.Err() != nil {
  56. me.Logger.Errorf("redis set expire error: %v", expireCmd.Err())
  57. }
  58. }
  59. return count <= WX_DAILY_ADDCOUNT, count, nil
  60. }
  61. func NewWxAddFreqCtl(ctx context.Context, svcCtx *svc.ServiceContext) *wxAddFreqCtl {
  62. return &wxAddFreqCtl{
  63. Logger: logx.WithContext(ctx),
  64. ctx: ctx,
  65. svcCtx: svcCtx}
  66. }
  67. func (l *CronTask) CliwxAddFriend() {
  68. l.wxAddFriend()
  69. }
  70. func (l *CronTask) wxAddFriend() {
  71. tasks, err := l.getTaskList()
  72. if err != nil {
  73. fmt.Println(err)
  74. l.Logger.Errorf("get AddWechatFriendLog TaskList %v", err)
  75. return
  76. }
  77. serv := addfriend.NewAddWechatFriendService(l.ctx, l.svcCtx)
  78. servFreqCtl := NewWxAddFreqCtl(l.ctx, l.svcCtx)
  79. canAdd := false
  80. addCount := int64(0)
  81. for _, task := range tasks {
  82. canAdd, addCount, err = servFreqCtl.Check(task.OwnerWxID)
  83. if err != nil {
  84. l.Logger.Errorf("check add freq error:%v", err)
  85. continue
  86. }
  87. if !canAdd {
  88. l.Logger.Infof("WxID:'%s' AddNewFriend today count:%d over limit:%d",
  89. task.OwnerWxID, addCount, WX_DAILY_ADDCOUNT)
  90. continue
  91. }
  92. ok := serv.AddNewFriend(task.OwnerWxID, task.FindContent, task.Message)
  93. l.Logger.Debugf("serv.AddNewFriend()=>%v", ok)
  94. }
  95. }
  96. func (me *CronTask) getTaskList() ([]*ent.AddWechatFriendLog, error) {
  97. taskrow := ent.AddWechatFriendLog{}
  98. mapAnyType := reflect.TypeOf(taskrow.FindRequest)
  99. fieldListStr, _, err := compapi.EntStructGenScanField(&taskrow, mapAnyType)
  100. if err != nil {
  101. return nil, err
  102. }
  103. rawQuery := fmt.Sprintf(`
  104. WITH RandRanked AS (
  105. SELECT %s,ROW_NUMBER() OVER(PARTITION BY owner_wx_id ORDER BY id DESC) as rn
  106. FROM add_wechat_friend_log
  107. WHERE is_can_add = 1 AND task_count < 3)
  108. SELECT %s FROM RandRanked WHERE rn = 1;`, fieldListStr, fieldListStr)
  109. //fmt.Println(rawQuery)
  110. // 执行原始查询
  111. rows, err := me.svcCtx.DB.QueryContext(me.ctx, rawQuery)
  112. if err != nil {
  113. return nil, fmt.Errorf("fetch fair tasks query error: %w", err)
  114. }
  115. defer rows.Close()
  116. Idx := 0
  117. tasks := []*ent.AddWechatFriendLog{}
  118. for rows.Next() {
  119. var scanParams []any
  120. mapAnyType := reflect.TypeOf(taskrow.FindRequest)
  121. _, scanParams, err = compapi.EntStructGenScanField(&taskrow, mapAnyType)
  122. if err != nil {
  123. break
  124. }
  125. err = rows.Scan(scanParams...)
  126. if err != nil {
  127. break
  128. }
  129. tasks = append(tasks, &taskrow)
  130. Idx++
  131. }
  132. return tasks, nil
  133. }