Sfoglia il codice sorgente

群发增加时间控制,增加定时任务

DESKTOP-53URE31\USER 7 mesi fa
parent
commit
cfa394837d

+ 30 - 0
crontask/init.go

@@ -0,0 +1,30 @@
+package crontask
+
+import (
+	"context"
+	"wechat-api/internal/svc"
+
+	"github.com/robfig/cron/v3"
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type CronTask struct {
+	logx.Logger
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+}
+
+func NewCronTask(ctx context.Context, svcCtx *svc.ServiceContext) *CronTask {
+	return &CronTask{
+		Logger: logx.WithContext(ctx),
+		ctx:    ctx,
+		svcCtx: svcCtx,
+	}
+}
+
+func ScheduleRun(c *cron.Cron, serverCtx *svc.ServiceContext) {
+	l := NewCronTask(context.Background(), serverCtx)
+	c.AddFunc("* * * * *", func() {
+		l.sendMsg()
+	})
+}

+ 176 - 0
crontask/send_msg.go

@@ -0,0 +1,176 @@
+package crontask
+
+import (
+	"strings"
+	"time"
+	"wechat-api/ent"
+	"wechat-api/ent/batchmsg"
+	"wechat-api/ent/contact"
+	"wechat-api/ent/label"
+	"wechat-api/ent/labelrelationship"
+	"wechat-api/ent/msg"
+	"wechat-api/ent/wx"
+	"wechat-api/hook"
+)
+
+func (l *CronTask) sendMsg() {
+	// 获取 BatchMsg 表中 start_time 小于当前时间并且 status 为 0 或 1 的数据
+	batchlist, err := l.svcCtx.DB.BatchMsg.Query().Where(batchmsg.StartTimeLT(time.Now()), batchmsg.StatusIn(0, 1)).All(l.ctx)
+	if err != nil {
+		l.Logger.Errorf("batchlist err: %v", err)
+		return
+	}
+
+	for _, batch := range batchlist {
+		// 记录当前批次开始处理
+		l.Logger.Info("batch start: ", batch.BatchNo)
+		// 如果 批次 status 为 0,则先产生待发送消息
+		if batch.Status == 0 {
+			userlist := make([]*ent.Contact, 0)
+			if batch.Tag == "all" {
+				// 获取 contact 表中  wx_wxid 等于 req.Fromwxid 的 type 为1或2的数据
+				userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.TypeIn(1, 2)).All(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("userlist err: %v", err)
+					continue
+				}
+			} else {
+
+				tags := strings.Split(batch.Tag, ",")
+
+				// 获取 label 表中 name 为 tags的记录
+				labids, err := l.svcCtx.DB.Label.Query().Where(label.NameIn(tags...)).IDs(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("labids err: %v", err)
+					continue
+				}
+				// 获取 label_relationship 表中,label_id 等于 labids 的 contact_id
+				labelrelationships, err := l.svcCtx.DB.LabelRelationship.Query().Where(labelrelationship.LabelIDIn(labids...)).All(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("labelrelationships err: %v", err)
+					continue
+				}
+				contact_ids := make([]uint64, 0)
+				for _, labelrelationship := range labelrelationships {
+					contact_ids = append(contact_ids, labelrelationship.ContactID)
+				}
+				if len(contact_ids) > 0 {
+					// 获取 contact 表中 wx_wxid 等于 req.Fromwxid 并且 id 等于 contact_ids 并且 type 为1或2 的数据
+					userlist, err = l.svcCtx.DB.Contact.Query().Where(contact.WxWxid(batch.Fromwxid), contact.IDIn(contact_ids...), contact.TypeIn(1, 2)).All(l.ctx)
+					if err != nil {
+						l.Logger.Errorf("userlist err: %v", err)
+						continue
+					}
+				}
+			}
+
+			msgs := make([]*ent.MsgCreate, 0)
+
+			for _, user := range userlist {
+
+				msg := l.svcCtx.DB.Msg.Create().
+					SetNotNilFromwxid(&batch.Fromwxid).
+					SetNotNilToid(&user.Wxid).
+					SetMsgtype(1).
+					SetNotNilMsg(&batch.Msg).
+					SetStatus(0).
+					SetNotNilBatchNo(&batch.BatchNo)
+
+				msgs = append(msgs, msg)
+			}
+
+			if len(msgs) > 0 {
+				_, err = l.svcCtx.DB.Msg.CreateBulk(msgs...).Save(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("msg CreateBulk err: %v", err)
+					continue
+				}
+			} else {
+				// 如果没有消息,直接更新批次状态为已发送
+				_, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
+					SetStatus(2).
+					SetTotal(0).
+					SetSuccess(0).
+					SetFail(0).
+					Save(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("batchmsg update err: %v", err)
+				}
+				continue
+			}
+
+			_, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
+			if err != nil {
+				l.Logger.Errorf("batchmsg update err: %v", err)
+				continue
+			}
+		}
+
+		// 获取当前批次的所有待发送消息
+		msglist, err := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(0)).All(l.ctx)
+		if err != nil {
+			l.Logger.Errorf("msglist err: %v", err)
+			continue
+		}
+
+		wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
+		if err != nil {
+			l.Logger.Errorf("wxInfo err: %v", err)
+			continue
+		}
+		serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
+		if err != nil {
+			l.Logger.Errorf("serverInfo err: %v", err)
+			continue
+		}
+
+		hookClient := hook.NewHook(serverInfo.PrivateIP, serverInfo.AdminPort, wxInfo.Port)
+
+		//循环发送消息
+		for _, msg := range msglist {
+			err = hookClient.SendTextMsg(msg.Toid, msg.Msg)
+			// 每次发完暂停1秒
+			time.Sleep(time.Second)
+			if err != nil {
+				l.Logger.Errorf("send msg err: %v", err)
+				_, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
+				if err != nil {
+					l.Logger.Errorf("msg update err: %v", err)
+					continue
+				}
+				continue
+			}
+
+			_, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
+			if err != nil {
+				l.Logger.Errorf("msg update err: %v", err)
+				continue
+			}
+		}
+
+		// 获取当前批次的所有发送的消息总数
+		total, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo)).Count(l.ctx)
+
+		// 获取当前批次的所有发送成功的消息总数
+		success, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(1)).Count(l.ctx)
+
+		// 获取当前批次的所有发送失败的消息总数
+		fail, _ := l.svcCtx.DB.Msg.Query().Where(msg.BatchNoEQ(batch.BatchNo), msg.StatusEQ(2)).Count(l.ctx)
+
+		// 更新批次状态为已发送,同时更新发送总数、发送成功数量、失败数量、结束时间
+		_, err = l.svcCtx.DB.BatchMsg.UpdateOneID(batch.ID).
+			SetStatus(2).
+			SetTotal(int32(total)).
+			SetSuccess(int32(success)).
+			SetFail(int32(fail)).
+			SetStopTime(time.Now()).
+			Save(l.ctx)
+		if err != nil {
+			l.Logger.Errorf("batchmsg update err: %v", err)
+			continue
+		}
+		l.Logger.Info("batch stop: ", batch.BatchNo)
+
+	}
+
+}

+ 2 - 0
desc/wechat/batch_msg.api

@@ -32,6 +32,8 @@ type (
         // 开始时间 
         StartTime  *int64 `json:"startTime,optional"`
 
+        StartTimeStr  *string `json:"startTimeStr,optional"`
+
         // 结束时间 
         StopTime  *int64 `json:"stopTime,optional"`
 

+ 12 - 9
etc/wechat.yaml

@@ -3,8 +3,10 @@ Host: 0.0.0.0
 Port: 19101
 Timeout: 30000
 
+Mode: "dev"
+
 Auth:
-  AccessSecret: "jS6VKDtsJf3z1n2VKDtsJf3z1n2"
+  AccessSecret: "LnQD46hBde0AgFXBer8ZZZe3FgC"
   AccessExpire: 259200
 
 CROSConf:
@@ -13,7 +15,8 @@ CROSConf:
 Log:
   ServiceName: WechatApiLogger
   Mode: console
-  Path: /Users/songbowen/development/gooki/scrm/gateway-ai
+  Encoding: plain
+  Stat: false
   Level: info
   Compress: false
   KeepDays: 7
@@ -21,10 +24,10 @@ Log:
 
 DatabaseConf:
   Type: mysql
-  Host: mysql-server
+  Host: localhost
   Port: 3306
   DBName: wechat
-  Username: root
+  Username: simple-admin
   Password: simple-admin.
   MaxOpenConn: 100
   SSLMode: disable
@@ -33,18 +36,18 @@ DatabaseConf:
 CoreRpc:
   # Target: k8s://default/core-rpc-svc:9101
   Endpoints:
-    - core-rpc:9101
+    - 127.0.0.1:9101
   Enabled: true
 
 RedisConf:
-  Host: redis-server:6379
+  Host: 127.0.0.1:6379
 
 CasbinDatabaseConf:
   Type: mysql
-  Host: mysql-server
+  Host: localhost
   Port: 3306
-  DBName: simple_admin
-  Username: root
+  DBName: wechat-admin
+  Username: simple-admin
   Password: simple-admin.
   MaxOpenConn: 100
   SSLMode: disable

+ 1 - 0
go.mod

@@ -91,6 +91,7 @@ require (
 	github.com/quic-go/qpack v0.4.0 // indirect
 	github.com/quic-go/quic-go v0.42.0 // indirect
 	github.com/refraction-networking/utls v1.6.3 // indirect
+	github.com/robfig/cron/v3 v3.0.1 // indirect
 	github.com/spaolacci/murmur3 v1.1.0 // indirect
 	github.com/zclconf/go-cty v1.14.3 // indirect
 	go.etcd.io/etcd/api/v3 v3.5.12 // indirect

+ 2 - 0
go.sum

@@ -443,6 +443,8 @@ github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLB
 github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
 github.com/refraction-networking/utls v1.6.3 h1:MFOfRN35sSx6K5AZNIoESsBuBxS2LCgRilRIdHb6fDc=
 github.com/refraction-networking/utls v1.6.3/go.mod h1:yil9+7qSl+gBwJqztoQseO6Pr3h62pQoY1lXiNR/FPs=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=

+ 19 - 20
internal/logic/batch_msg/create_batch_msg_logic.go

@@ -3,6 +3,7 @@ package batch_msg
 import (
 	"context"
 	"strings"
+	"time"
 
 	"wechat-api/ent"
 	"wechat-api/ent/contact"
@@ -37,6 +38,23 @@ func (l *CreateBatchMsgLogic) CreateBatchMsg(req *types.BatchMsgInfo) (*types.Ba
 	for _, label := range req.Labels {
 		if strings.EqualFold(label, "all") || strings.EqualFold(label, "全部") {
 			all = true
+			tagstring := "all"
+			req.Tag = &tagstring
+		}
+	}
+
+	l.Logger.Infof("CreateBatchMsgLogic: %v", req)
+
+	startTime := time.Now()
+	// req.StartTimeStr 不为nil并且不为空
+	if req.StartTimeStr != nil && *req.StartTimeStr != "" {
+		var err error
+		// 将 req.StartTimeStr 转换为 req.StartTime
+		startTime, err = time.Parse("2006-01-02 15:04:05", *req.StartTimeStr)
+		if err != nil {
+			// 处理错误,例如打印错误并返回
+			l.Logger.Errorf("时间字符串转换错误: %v", err)
+			return nil, err
 		}
 	}
 
@@ -90,31 +108,12 @@ func (l *CreateBatchMsgLogic) CreateBatchMsg(req *types.BatchMsgInfo) (*types.Ba
 		SetNotNilMsg(req.Msg).
 		SetNotNilTag(req.Tag).
 		SetTotal(total).
+		SetNotNilStartTime(&startTime).
 		Save(l.ctx)
 
 	if err != nil {
 		return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
 	}
 
-	msgs := make([]*ent.MsgCreate, 0)
-
-	for _, user := range userlist {
-
-		msg := l.svcCtx.DB.Msg.Create().
-			SetNotNilFromwxid(req.Fromwxid).
-			SetNotNilToid(&user.Wxid).
-			SetMsgtype(1).
-			SetNotNilMsg(req.Msg).
-			SetNotNilBatchNo(&batchNo)
-
-		msgs = append(msgs, msg)
-	}
-
-	_, err = l.svcCtx.DB.Msg.CreateBulk(msgs...).Save(l.ctx)
-
-	if err != nil {
-		return nil, dberrorhandler.DefaultEntError(l.Logger, err, req)
-	}
-
 	return &types.BaseMsgResp{Msg: errormsg.CreateSuccess}, nil
 }

+ 2 - 1
internal/types/types.go

@@ -1042,7 +1042,8 @@ type BatchMsgInfo struct {
 	// 失败数量
 	Fail *int32 `json:"fail,optional"`
 	// 开始时间
-	StartTime *int64 `json:"startTime,optional"`
+	StartTime    *int64  `json:"startTime,optional"`
+	StartTimeStr *string `json:"startTimeStr,optional"`
 	// 结束时间
 	StopTime *int64 `json:"stopTime,optional"`
 	// 标签列表

+ 9 - 0
wechat.go

@@ -26,10 +26,12 @@ import (
 	"flag"
 	"fmt"
 
+	"wechat-api/crontask"
 	"wechat-api/internal/config"
 	"wechat-api/internal/handler"
 	"wechat-api/internal/svc"
 
+	"github.com/robfig/cron/v3"
 	"github.com/zeromicro/go-zero/core/conf"
 	"github.com/zeromicro/go-zero/rest"
 )
@@ -48,6 +50,13 @@ func main() {
 	ctx := svc.NewServiceContext(c)
 	handler.RegisterHandlers(server, ctx)
 
+	cronCtx := cron.New(cron.WithChain(
+		cron.SkipIfStillRunning(cron.DefaultLogger),
+	))
+	crontask.ScheduleRun(cronCtx, ctx)
+	go cronCtx.Start()
+	defer cronCtx.Stop()
+
 	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
 	server.Start()
 }