Forráskód Böngészése

fix:add new crontask

jimmyyem 5 hónapja
szülő
commit
a154c7e0c5

+ 10 - 0
crontask/init.go

@@ -27,4 +27,14 @@ func ScheduleRun(c *cron.Cron, serverCtx *svc.ServiceContext) {
 	c.AddFunc("* * * * *", func() {
 		l.sendMsg()
 	})
+
+	sendWx := NewCronTask(context.Background(), serverCtx)
+	c.AddFunc("* * * * *", func() {
+		sendWx.sendWx()
+	})
+
+	sendWxOnTimeout := NewCronTask(context.Background(), serverCtx)
+	c.AddFunc("* * * * *", func() {
+		sendWxOnTimeout.sendWxOnTimeout()
+	})
 }

+ 7 - 3
crontask/send_msg.go

@@ -18,13 +18,14 @@ import (
 
 func (l *CronTask) sendMsg() {
 	// 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
-	batchlist, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
+	batchList, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
 	if err != nil {
-		l.Logger.Errorf("batchlist err: %v", err)
+		l.Logger.Errorf("batchList err: %v", err)
 		return
 	}
 
-	for _, batch := range batchlist {
+	startTime := time.Now()
+	for _, batch := range batchList {
 		// 记录当前批次开始处理
 		l.Logger.Info("batch start: ", batch.BatchNo)
 		// 如果 批次 status 为 0,则先产生待发送消息
@@ -237,6 +238,9 @@ func (l *CronTask) sendMsg() {
 
 	}
 
+	finishTime := time.Now()
+	l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
+	return
 }
 
 // 根据URL获取图片名

+ 147 - 0
crontask/send_wx.go

@@ -0,0 +1,147 @@
+package crontask
+
+import (
+	"context"
+	"encoding/json"
+	"github.com/imroc/req/v3"
+	"strings"
+	"time"
+	"wechat-api/ent"
+	"wechat-api/ent/messagerecords"
+	"wechat-api/ent/server"
+	"wechat-api/ent/wx"
+	"wechat-api/internal/types/payload"
+)
+
+func (l *CronTask) sendWx() {
+	ctx := context.Background()
+	config :=
+		`{
+    "ip": "172.18.41.219",
+    "number": 10,
+    "wx_list": [
+        {
+            "port": "30001",
+            "wxid": "wxid_77au928zeb2p12",
+            "nickname": "百事通"
+        },
+        {
+            "port": "30007",
+            "wxid": "wxid_edc0mvp188ms22",
+            "nickname": "爱博闻"
+        },
+        {
+            "port": "30040",
+            "wxid": "wxid_wupstwe851nb12",
+            "nickname": "轻马小助手"
+        }
+    ]
+}`
+
+	var p payload.SendWxPayload
+	if err := json.Unmarshal([]byte(config), &p); err != nil {
+		l.Logger.Errorf("failed to unmarshal the payload :%v", err)
+		return
+	}
+
+	// 更新最旧的 p.Number 条 status 值为 1 或 4 的 message_records 记录的 status 为 2
+	messages, err := l.svcCtx.DB.MessageRecords.Query().
+		Where(messagerecords.StatusIn(1, 4)).
+		Order(ent.Asc(messagerecords.FieldCreatedAt)).
+		Limit(p.Number).
+		All(ctx)
+	if err != nil {
+		l.Logger.Errorf("get messageRecords list failed %v", err)
+		return
+	}
+
+	startTime := time.Now()
+
+	client := req.C().DevMode()
+	client.SetCommonRetryCount(2).
+		SetCommonRetryBackoffInterval(1*time.Second, 5*time.Second).
+		SetCommonRetryFixedInterval(2 * time.Second).SetTimeout(30 * time.Second)
+
+	type SendTextMsgReq struct {
+		Wxid string `json:"wxid"`
+		Msg  string `json:"msg"`
+	}
+
+	type SendFileMsgReq struct {
+		Wxid        string `json:"wxid"`
+		Filepath    string `json:"filepath"`
+		Diyfilename string `json:"diyfilename"`
+	}
+
+	// 从 Redis 中获取服务器信息(ip, port)
+	getServerInfo := func(wxid string) (string, error) {
+		key := "crontask_wx_server_info"
+		val, err := l.svcCtx.Rds.HGet(ctx, key, wxid).Result()
+		if err != nil {
+			return "", err
+		}
+
+		if val == "" {
+			wx, err := l.svcCtx.DB.Wx.Query().Where(wx.WxidEQ(wxid)).First(l.ctx)
+			if err != nil {
+				l.Logger.Errorf("get wx info failed wxid=%v err=%v", wxid, err)
+				return "", err
+			}
+			server, err := l.svcCtx.DB.Server.Query().Where(server.IDEQ(wx.ServerID)).First(l.ctx)
+			if err != nil {
+				l.Logger.Errorf("get server info failed wxid=%v err=%v", wxid, err)
+				return "", err
+			}
+			val = server.PrivateIP + ":" + server.AdminPort
+			l.svcCtx.Rds.HSet(ctx, key, wxid, val)
+		}
+
+		return val, nil
+	}
+
+	for _, v := range messages {
+		serverInfo, _ := getServerInfo(v.BotWxid)
+		infoArray := strings.Split(serverInfo, ":")
+		ip, wxPort := infoArray[0], infoArray[1]
+
+		// 更新 status 值为 2(发送中)
+		tx, err := l.svcCtx.DB.Tx(l.ctx)
+
+		_, err = tx.MessageRecords.UpdateOneID(v.ID).SetStatus(2).Save(ctx)
+		if err != nil {
+			l.Logger.Errorf("update messageRecords failed id=%v err=%v", v.ID, err)
+			continue
+		}
+
+		if v.ContentType == 1 {
+			_, err = client.R().SetBody(&SendTextMsgReq{
+				Wxid: v.ContactWxid,
+				Msg:  v.Content,
+			}).Post("http://" + ip + ":" + wxPort + "/SendTextMsg")
+		} else {
+			_, err = client.R().SetBody(&SendFileMsgReq{
+				Wxid:        v.ContactWxid,
+				Filepath:    v.Content,
+				Diyfilename: v.Meta.Filename,
+			}).Post("http://" + ip + ":" + wxPort + "/SendFileMsg")
+		}
+
+		if err != nil {
+			_ = tx.Rollback()
+			continue
+		} else {
+			_, err := tx.MessageRecords.UpdateOneID(v.ID).SetStatus(3).SetSendTime(time.Now()).Save(ctx)
+			if err != nil {
+				_ = tx.Rollback()
+			} else {
+				_ = tx.Commit()
+			}
+		}
+
+		time.Sleep(time.Duration(60/p.Number) * time.Second)
+	}
+
+	finishTime := time.Now()
+	l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
+	return
+}

+ 97 - 0
crontask/send_wx_on_timeout.go

@@ -0,0 +1,97 @@
+package crontask
+
+import (
+	"context"
+	"time"
+	"wechat-api/ent"
+	"wechat-api/ent/custom_types"
+	"wechat-api/ent/messagerecords"
+	"wechat-api/ent/sopnode"
+	"wechat-api/ent/sopstage"
+	"wechat-api/ent/soptask"
+)
+
+func (l *CronTask) sendWxOnTimeout() {
+	ctx := context.Background()
+	startTime := time.Now()
+
+	// 查询所有 no_reply_condition 值非 0  的 sop_node 记录
+	nodes, err := l.svcCtx.DB.SopNode.Query().
+		Where(sopnode.NoReplyConditionNEQ(0)).
+		Where(sopnode.HasSopStageWith(
+			sopstage.StatusEQ(1),
+			sopstage.DeletedAtIsNil(),
+			sopstage.HasSopTaskWith(
+				soptask.StatusEQ(3),
+				soptask.DeletedAtIsNil(),
+			),
+		)).
+		All(ctx)
+	if err != nil {
+		l.Logger.Errorf("get node list failed %v", err)
+		return
+	}
+
+	// 遍历 nodes,将其各记录 parent_id 的值存入一个新的数组 parent_nodes 中
+	//parentNodes := make([]uint64, 0)
+	//stages := make([]uint64, 0)
+	messages := make([]*ent.MessageRecords, 0)
+	for _, node := range nodes {
+		lowerBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition+2))
+		upperBound := startTime.Add(-time.Minute * time.Duration(node.NoReplyCondition))
+		if node.ParentID == 0 {
+			messages, _ = l.svcCtx.DB.MessageRecords.Query().
+				Where(messagerecords.StatusEQ(3)).
+				Where(messagerecords.SourceTypeEQ(3)).
+				Where(messagerecords.SourceIDEQ(node.StageID)).
+				Where(messagerecords.SubSourceIDEQ(0)).
+				Where(messagerecords.SendTimeGTE(lowerBound)).
+				Where(messagerecords.SendTimeLTE(upperBound)).
+				All(ctx)
+		} else {
+			messages, _ = l.svcCtx.DB.MessageRecords.Query().
+				Where(messagerecords.StatusEQ(3)).
+				Where(messagerecords.SourceTypeEQ(4)).
+				Where(messagerecords.SourceIDIn(node.ParentID)).
+				Where(messagerecords.SubSourceIDEQ(0)).
+				Where(messagerecords.SendTimeGTE(lowerBound)).
+				Where(messagerecords.SendTimeLTE(upperBound)).
+				All(ctx)
+		}
+		for _, s := range messages {
+			// 判断 s.Id 是否是 s.ContactID 的最新记录
+			latest, _ := l.svcCtx.DB.MessageRecords.Query().
+				Where(messagerecords.ContactIDEQ(s.ContactID)).
+				Where(messagerecords.StatusEQ(3)).
+				Order(ent.Desc(messagerecords.FieldCreatedAt)).
+				First(ctx)
+
+			if latest.ID == s.ID {
+				// 创建 MessageRecords 记录
+				for i, c := range node.ActionMessage {
+					meta := custom_types.Meta{}
+					if c.Meta != nil {
+						meta.Filename = c.Meta.Filename
+					}
+					_, _ = l.svcCtx.DB.MessageRecords.Create().
+						SetStatus(1).
+						SetBotWxid(s.BotWxid).
+						SetContactID(s.ContactID).
+						SetContactType(s.ContactType).
+						SetContactWxid(s.ContactWxid).
+						SetContentType(c.Type).
+						SetContent(c.Content).
+						SetMeta(meta).
+						SetSourceType(4).
+						SetSourceID(node.ID).
+						SetSubSourceID(uint64(i)).
+						Save(ctx)
+				}
+			}
+		}
+	}
+
+	finishTime := time.Now()
+	l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
+	return
+}

+ 6 - 0
internal/enum/taskresult/task_result.go

@@ -0,0 +1,6 @@
+package taskresult
+
+const (
+	Success uint8 = 1 + iota
+	Failed
+)

+ 17 - 0
internal/types/payload/payload.go

@@ -0,0 +1,17 @@
+// Package payload defines all the payload structures used in tasks
+package payload
+
+type SendWxPayload struct {
+	Ip     string   `json:"ip"`
+	Number int      `json:"number"`
+	WxList []WxList `json:"wx_list"`
+}
+
+type WxList struct {
+	Port     string `json:"port"`
+	Wxid     string `json:"wxid"`
+	Nickname string `json:"nickname"`
+}
+
+type SendWxOnTimeoutPayload struct {
+}