123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package crontask
- import (
- "context"
- "encoding/json"
- "regexp"
- "strconv"
- "strings"
- "time"
- "wechat-api/ent"
- "wechat-api/ent/contact"
- "wechat-api/ent/messagerecords"
- "wechat-api/ent/server"
- "wechat-api/ent/wx"
- "wechat-api/hook"
- "wechat-api/internal/types/payload"
- )
- func (l *CronTask) sendWx() {
- ctx := context.Background()
- config :=
- `{
- "ip": "172.18.41.219",
- "number": 10,
- "wx_list": [
- {
- "port": "30001",
- "wxid": "wxid_77au928zeb2p12",
- "nickname": "百事通"
- },
- {
- "port": "30007",
- "wxid": "wxid_edc0mvp188ms22",
- "nickname": "爱博闻"
- },
- {
- "port": "30040",
- "wxid": "wxid_wupstwe851nb12",
- "nickname": "轻马小助手"
- }
- ]
- }`
- var p payload.SendWxPayload
- if err := json.Unmarshal([]byte(config), &p); err != nil {
- l.Logger.Errorf("failed to unmarshal the payload :%v", err)
- return
- }
- // 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
- messages, err := l.svcCtx.DB.MessageRecords.Query().
- Where(messagerecords.StatusIn(1, 4)).
- Order(ent.Asc(messagerecords.FieldCreatedAt)).
- Limit(p.Number).
- All(ctx)
- if err != nil {
- l.Logger.Errorf("get messageRecords list failed %v", err)
- return
- }
- startTime := time.Now()
- type SendTextMsgReq struct {
- Wxid string `json:"wxid"`
- Msg string `json:"msg"`
- }
- type SendFileMsgReq struct {
- Wxid string `json:"wxid"`
- Filepath string `json:"filepath"`
- Diyfilename string `json:"diyfilename"`
- }
- // 从 Redis 中获取服务器信息(ip, port)
- getServerInfo := func(wxid string) (string, error) {
- key := "crontask_wx_server_info"
- val, _ := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
- //if err != nil {
- // return "", err
- //}
- if val == "" {
- wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
- if err != nil {
- l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
- return "", err
- }
- if wx.ServerID != 0 {
- server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
- if err != nil {
- l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
- return "", err
- }
- ctype := strconv.Itoa(int(wx.Ctype))
- val = server.PrivateIP + ":" + server.AdminPort + ":" + wx.Port + ":" + ctype
- l.svcCtx.Rds.HSet(ctx, key, wxid, val)
- }
- }
- return val, nil
- }
- // 从 Redis 中获取服务器信息(ip, port)
- getContactInfo := func(botwxid string, wxid string) (*ent.Contact, error) {
- contactInfo, _ := l.svcCtx.DB.Contact.Query().Where(
- contact.WxWxidEQ(botwxid),
- contact.WxidEQ(wxid),
- contact.CtypeIn(1, 3),
- ).First(l.ctx)
- return contactInfo, nil
- }
- for _, v := range messages {
- // 更新 status 值为 2(发送中)
- tx, _ := l.svcCtx.DB.Tx(l.ctx)
- if v.Content != "" {
- serverInfo, _ := getServerInfo(v.BotWxid)
- serverIp := ""
- adminPort := ""
- wxPort := ""
- ctype := ""
- if serverInfo != "" {
- infoArray := strings.Split(serverInfo, ":")
- serverIp, adminPort, wxPort, ctype = infoArray[0], infoArray[1], infoArray[2], infoArray[3]
- }
- _, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
- if err != nil {
- l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
- continue
- }
- var hookClient *hook.Hook
- if ctype == "3" { //企微
- hookClient = hook.NewWecomHook("", adminPort, wxPort)
- } else { //个微
- hookClient = hook.NewHook(serverIp, adminPort, wxPort)
- }
- if v.ContentType == 1 {
- content := v.Content
- if containsPlaceholder(content) {
- contactInfo, _ := getContactInfo(v.BotWxid, v.ContactWxid)
- content = varReplace(content, contactInfo)
- }
- err = hookClient.SendTextMsg(v.ContactWxid, content, v.BotWxid)
- } else {
- re := regexp.MustCompile(`[^/]+$`)
- fileName := re.FindString(v.Content)
- err = hookClient.SendPicMsg(v.ContactWxid, v.Content, fileName, v.BotWxid)
- }
- if err != nil {
- _ = tx.Rollback()
- continue
- } else {
- _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
- if err != nil {
- _ = tx.Rollback()
- } else {
- _ = tx.Commit()
- }
- }
- time.Sleep(time.Duration(60/p.Number) * time.Second)
- } else {
- _, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
- if err != nil {
- _ = tx.Rollback()
- } else {
- _ = tx.Commit()
- }
- }
- }
- finishTime := time.Now()
- l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
- return
- }
- func containsPlaceholder(s string) bool {
- pattern := `\$\{.*?\}`
- matched, _ := regexp.MatchString(pattern, s)
- return matched
- }
- func varReplace(s string, contactInfo *ent.Contact) string {
- nickname := ""
- var cname, carea, cbirthday, cbirtharea, cidcard_no, ctitle, sex, cage string
- if contactInfo != nil {
- nickname = contactInfo.Nickname
- carea = contactInfo.Carea
- cname = contactInfo.Cname
- cbirthday = contactInfo.Cbirthday
- cbirtharea = contactInfo.Cbirtharea
- cidcard_no = contactInfo.CidcardNo
- ctitle = contactInfo.Ctitle
- if contactInfo.Sex == 1 {
- sex = "男"
- } else if contactInfo.Sex == 2 {
- sex = "女"
- }
- age := contactInfo.Cage
- if age > 0 {
- cage = strconv.Itoa(age)
- }
- }
- s = strings.Replace(s, "${nickname}", nickname, -1)
- s = strings.Replace(s, "${cname}", cname, -1)
- s = strings.Replace(s, "${carea}", carea, -1)
- s = strings.Replace(s, "${cbirthday}", cbirthday, -1)
- s = strings.Replace(s, "${cbirtharea}", cbirtharea, -1)
- s = strings.Replace(s, "${cidcard_no}", cidcard_no, -1)
- s = strings.Replace(s, "${ctitle}", ctitle, -1)
- s = strings.Replace(s, "${sex}", sex, -1)
- s = strings.Replace(s, "${cage}", cage, -1)
- return s
- }
|