package crontask import ( "context" "fmt" "reflect" "time" _ "time/tzdata" "wechat-api/ent" "wechat-api/internal/service/addfriend" "wechat-api/internal/svc" "wechat-api/internal/utils/compapi" "github.com/zeromicro/go-zero/core/logx" ) const ( WX_FREQKEY = "WX_DAILY_FREQKEY" WX_DAILY_ADDCOUNT = 30 ) type wxAddFreqCtl struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func (me *wxAddFreqCtl) getDailyWxIDKey(wxID string) string { now, _ := getLocNowTime() dateStr := now.Format("20060102") // Go 的日期格式化字符串 return fmt.Sprintf("%s:%s:%s", WX_FREQKEY, wxID, dateStr) } func getLocNowTime() (time.Time, *time.Location) { loc, err := time.LoadLocation("Asia/Shanghai") if err != nil { fmt.Println(err) } now := time.Now() if loc != nil { now = now.In(loc) } return now, loc } func (me *wxAddFreqCtl) Check(wxID string) (bool, int64, error) { key := me.getDailyWxIDKey(wxID) //me.svcCtx.Rds.Del(me.ctx, key) count, err := me.svcCtx.Rds.Incr(me.ctx, key).Result() if err != nil { return false, 0, err } //如果是当天的第一次发送 (count == 1),设置过期时间 if count == 1 { // 计算到明天零点的秒数 now, loc := getLocNowTime() tomorrow := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, loc) expiration := tomorrow.Sub(now) // 设置过期时间 // 添加一个小的缓冲时间(例如 5 分钟),防止临界点问题 expireCmd := me.svcCtx.Rds.Expire(me.ctx, key, expiration+5*time.Minute) if expireCmd.Err() != nil { me.Logger.Errorf("redis set expire error: %v", expireCmd.Err()) } } return count <= WX_DAILY_ADDCOUNT, count, nil } func NewWxAddFreqCtl(ctx context.Context, svcCtx *svc.ServiceContext) *wxAddFreqCtl { return &wxAddFreqCtl{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx} } func (l *CronTask) CliwxAddFriend() { l.wxAddFriend() } func (l *CronTask) wxAddFriend() { tasks, err := l.getTaskList() if err != nil { fmt.Println(err) l.Logger.Errorf("get AddWechatFriendLog TaskList %v", err) return } serv := addfriend.NewAddWechatFriendService(l.ctx, l.svcCtx) servFreqCtl := NewWxAddFreqCtl(l.ctx, l.svcCtx) canAdd := false addCount := int64(0) for _, task := range tasks { canAdd, addCount, err = servFreqCtl.Check(task.OwnerWxID) if err != nil { l.Logger.Errorf("check add freq error:%v", err) continue } if !canAdd { l.Logger.Infof("WxID:'%s' AddNewFriend today count:%d over limit:%d", task.OwnerWxID, addCount, WX_DAILY_ADDCOUNT) continue } ok := serv.AddNewFriend(task.OwnerWxID, task.FindContent, task.Message) l.Logger.Debugf("serv.AddNewFriend()=>%v", ok) } } func (me *CronTask) getTaskList() ([]*ent.AddWechatFriendLog, error) { taskrow := ent.AddWechatFriendLog{} mapAnyType := reflect.TypeOf(taskrow.FindRequest) fieldListStr, _, err := compapi.EntStructGenScanField(&taskrow, mapAnyType) if err != nil { return nil, err } rawQuery := fmt.Sprintf(` WITH RandRanked AS ( SELECT %s,ROW_NUMBER() OVER(PARTITION BY owner_wx_id ORDER BY id DESC) as rn FROM add_wechat_friend_log WHERE is_can_add = 1 AND task_count < 3) SELECT %s FROM RandRanked WHERE rn = 1;`, fieldListStr, fieldListStr) //fmt.Println(rawQuery) // 执行原始查询 rows, err := me.svcCtx.DB.QueryContext(me.ctx, rawQuery) if err != nil { return nil, fmt.Errorf("fetch fair tasks query error: %w", err) } defer rows.Close() Idx := 0 tasks := []*ent.AddWechatFriendLog{} for rows.Next() { var scanParams []any mapAnyType := reflect.TypeOf(taskrow.FindRequest) _, scanParams, err = compapi.EntStructGenScanField(&taskrow, mapAnyType) if err != nil { break } err = rows.Scan(scanParams...) if err != nil { break } tasks = append(tasks, &taskrow) Idx++ } return tasks, nil }