Pārlūkot izejas kodu

fix:修改群发和单发

jimmyyem 2 nedēļas atpakaļ
vecāks
revīzija
e87205e088

+ 25 - 26
crontask/send_msg.go

@@ -25,14 +25,14 @@ func (l *CronTask) sendMsg() {
 	).All(l.ctx)
 	l.Logger.Infof("send_msg.go BatchList %v\n", batchList)
 	if err != nil {
-		l.Logger.Errorf("batchList err: %v", err)
+		l.Logger.Errorf("batchList err: %v\n", err)
 		return
 	}
 
 	startTime := time.Now()
 	for _, batch := range batchList {
 		// 记录当前批次开始处理
-		l.Logger.Info("batch start: ", batch.BatchNo)
+		l.Logger.Infof("batch start: %s\n", batch.BatchNo)
 		// 如果 批次 status 为 0,则先产生待发送消息
 		if batch.Status == 0 {
 			userList := make([]*ent.Contact, 0)
@@ -49,11 +49,11 @@ func (l *CronTask) sendMsg() {
 			var contactTags, groupTags []uint64
 			if contactTags, ok = tagMap["contact_tag"]; ok {
 				allContact = hasAll(contactTags, 0)
-				l.Logger.Infof("contactTags=%v", contactTags)
+				l.Logger.Infof("contactTags=%v \n", contactTags)
 			}
 			if groupTags, ok = tagMap["group_tag"]; ok {
 				allGroup = hasAll(groupTags, 0)
-				l.Logger.Infof("groupTags=%v", groupTags)
+				l.Logger.Infof("groupTags=%v \n", groupTags)
 			}
 
 			var err error
@@ -65,7 +65,7 @@ func (l *CronTask) sendMsg() {
 					contact.CtypeIn(1, 3),
 				).All(l.ctx)
 				if err != nil {
-					l.Logger.Errorf("userlist err: %v", err)
+					l.Logger.Errorf("userlist err: %v \n", err)
 					continue
 				}
 			} else {
@@ -77,13 +77,13 @@ func (l *CronTask) sendMsg() {
 						contact.CtypeIn(1, 3),
 					).All(l.ctx)
 					if err != nil {
-						l.Logger.Errorf("userList err: %v", err)
+						l.Logger.Errorf("userList err: %v \n", err)
 						continue
 					}
 				} else { //获取指定标签的联系人
 					userList, err = l.getContactList(contactTags, batch.Fromwxid, 1)
 					if err != nil {
-						l.Logger.Errorf("userList err: %v", err)
+						l.Logger.Errorf("userList err: %v \n", err)
 						continue
 					}
 				}
@@ -96,13 +96,13 @@ func (l *CronTask) sendMsg() {
 						contact.CtypeIn(1, 3),
 					).All(l.ctx)
 					if err != nil {
-						l.Logger.Errorf("groupList err: %v", err)
+						l.Logger.Errorf("groupList err: %v \n", err)
 						continue
 					}
 				} else { //获取指定标签的群
 					groupList, err = l.getContactList(groupTags, batch.Fromwxid, 2)
 					if err != nil {
-						l.Logger.Errorf("groupList err: %v", err)
+						l.Logger.Errorf("groupList err: %v \n", err)
 						continue
 					}
 				}
@@ -118,7 +118,7 @@ func (l *CronTask) sendMsg() {
 			// 这里是把 batch.Msg 转换为 json 数组
 			msgArray := make([]custom_types.Action, 0)
 			err = json.Unmarshal([]byte(batch.Msg), &msgArray)
-			l.Logger.Infof("msgArray length= %v, err:%v", len(msgArray), err)
+			l.Logger.Infof("msgArray length= %v, err:%v \n", len(msgArray), err)
 			if err != nil {
 				// json 解析失败
 				msgArray = make([]custom_types.Action, 0)
@@ -149,19 +149,19 @@ func (l *CronTask) sendMsg() {
 				// 加事务,批量操作一条 batch_msg 和 一堆 msg 信息
 				tx, err := l.svcCtx.DB.Tx(l.ctx)
 				if err != nil {
-					l.Logger.Errorf("start db transaction err: %v", err)
+					l.Logger.Errorf("start db transaction err: %v \n", err)
 					continue
 				}
 				_, err = tx.BatchMsg.UpdateOneID(batch.ID).Where(batchmsg.StatusNEQ(1)).SetStatus(1).Save(l.ctx)
 				if err != nil {
 					_ = tx.Rollback()
-					l.Logger.Errorf("batchmsg update err: %v", err)
+					l.Logger.Errorf("batchmsg update err: %v \n", err)
 					continue
 				}
 				_, err = tx.Msg.CreateBulk(msgs...).Save(l.ctx)
 				if err != nil {
 					_ = tx.Rollback()
-					l.Logger.Errorf("msg CreateBulk err: %v", err)
+					l.Logger.Errorf("msg CreateBulk err: %v \n", err)
 					continue
 				}
 				_ = tx.Commit()
@@ -174,7 +174,7 @@ func (l *CronTask) sendMsg() {
 					SetFail(0).
 					Save(l.ctx)
 				if err != nil {
-					l.Logger.Errorf("batchmsg update err: %v", err)
+					l.Logger.Errorf("batchmsg update err: %v \n", err)
 				}
 				continue
 			}
@@ -186,35 +186,34 @@ func (l *CronTask) sendMsg() {
 			msg.StatusEQ(0),
 		).All(l.ctx)
 		if err != nil {
-			l.Logger.Errorf("msglist err: %v", err)
+			l.Logger.Errorf("msglist err: %v \n", err)
 			continue
 		}
 
 		wxInfo, err := l.svcCtx.DB.Wx.Query().Where(wx.Wxid(batch.Fromwxid)).Only(l.ctx)
 		if err != nil {
-			l.Logger.Errorf("wxInfo err: %v", err)
+			l.Logger.Errorf("wxInfo err: %v \n", err)
 			continue
 		}
 
 		privateIP := ""
 		adminPort := ""
 		port := ""
-		var ctype uint64
+		ctype := batch.Ctype
 
 		if wxInfo.ServerID != 0 {
 			serverInfo, err := l.svcCtx.DB.Server.Get(l.ctx, wxInfo.ServerID)
 			if err != nil {
-				l.Logger.Errorf("serverInfo err: %v", err)
+				l.Logger.Errorf("serverInfo err: %v \n", err)
 				continue
 			}
 			privateIP = serverInfo.PrivateIP
 			adminPort = serverInfo.AdminPort
 			port = wxInfo.Port
-			ctype = wxInfo.Ctype
 		}
 
 		var hookClient *hook.Hook
-		if ctype == uint64(3) {
+		if ctype == 3 {
 			hookClient = hook.NewWecomHook("", adminPort, port)
 		} else {
 			hookClient = hook.NewHook(privateIP, adminPort, port)
@@ -234,10 +233,10 @@ func (l *CronTask) sendMsg() {
 			// 每次发完暂停1秒
 			time.Sleep(time.Second)
 			if err != nil {
-				l.Logger.Errorf("send msg err: %v", err)
+				l.Logger.Errorf("send msg err: %v \n", err)
 				_, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(2).Save(l.ctx)
 				if err != nil {
-					l.Logger.Errorf("msg update err: %v", err)
+					l.Logger.Errorf("msg update err: %v \n", err)
 					continue
 				}
 				continue
@@ -245,7 +244,7 @@ func (l *CronTask) sendMsg() {
 
 			_, err = l.svcCtx.DB.Msg.UpdateOneID(msg.ID).SetStatus(1).Save(l.ctx)
 			if err != nil {
-				l.Logger.Errorf("msg update err: %v", err)
+				l.Logger.Errorf("msg update err: %v \n", err)
 				continue
 			}
 		}
@@ -268,15 +267,15 @@ func (l *CronTask) sendMsg() {
 			SetStopTime(time.Now()).
 			Save(l.ctx)
 		if err != nil {
-			l.Logger.Errorf("batchmsg update err: %v", err)
+			l.Logger.Errorf("batchmsg update err: %v \n", err)
 			continue
 		}
-		l.Logger.Info("batch stop: ", batch.BatchNo)
+		l.Logger.Infof("batch stop:%s \n", batch.BatchNo)
 
 	}
 
 	finishTime := time.Now()
-	l.Logger.Infof("This process cost %v", finishTime.Sub(startTime).String())
+	l.Logger.Infof("This process cost %v \n", finishTime.Sub(startTime).String())
 	return
 }
 

+ 2 - 0
desc/wechat/wxhook.api

@@ -42,6 +42,8 @@ type (
 
         // 微信文本消息内容
         Msg *string `json:"msg"`
+
+		Ctype *uint64 `json:"ctype,optional"`
     }
 
         // 发送微信图片消息请求参数

+ 41 - 80
hook/message.go

@@ -131,25 +131,26 @@ func (h *Hook) SendTextMsg(wxid, msg, wxWxid string) error {
 // SendPicMsg 发送微信图片
 func (h *Hook) SendPicMsg(wxid, picpath, diyfilename, wxWxid string) (err error) {
 	if h.ServerIp == "" {
-		if h.Ctype == uint64(3) {
-			conn, err := h.connWorkPhone()
+		conn, err := h.connWorkPhone()
+		if err != nil {
+			err = fmt.Errorf("SendPicMsg failed")
+			return err
+		}
+		defer func(conn *websocket.Conn) {
+			err = conn.Close()
 			if err != nil {
 				err = fmt.Errorf("SendPicMsg failed")
-				return err
-			}
-			defer func(conn *websocket.Conn) {
-				err = conn.Close()
-				if err != nil {
-					err = fmt.Errorf("SendPicMsg failed")
-				}
-			}(conn)
-			encodedString := base64.StdEncoding.EncodeToString([]byte(picpath))
-			contentType := "File"
-			if isImageFile(diyfilename) {
-				contentType = "Picture"
 			}
+		}(conn)
+		encodedString := base64.StdEncoding.EncodeToString([]byte(picpath))
+		contentType := "File"
+		if isImageFile(diyfilename) {
+			contentType = "Picture"
+		}
 
-			message := map[string]interface{}{
+		var message map[string]interface{}
+		if h.Ctype == uint64(3) {
+			message = map[string]interface{}{
 				"MsgType": "TalkToFriendTask",
 				"Content": map[string]interface{}{
 					"WxId":        wxWxid,
@@ -158,49 +159,8 @@ func (h *Hook) SendPicMsg(wxid, picpath, diyfilename, wxWxid string) (err error)
 					"Content":     encodedString,
 				},
 			}
-			transportMessageJSON, err := json.Marshal(message)
-			if err != nil {
-				return err
-			}
-			// 发送 JSON 消息
-			err = conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
-			if err != nil {
-				return fmt.Errorf("failed to send message: %v", err)
-			}
-			// 读取回复消息
-			_, replyMessage, err := conn.ReadMessage()
-			if err != nil {
-				return fmt.Errorf("failed to read message: %v", err)
-			}
-			var replyMessageData map[string]interface{}
-			err = json.Unmarshal([]byte(replyMessage), &replyMessageData)
-			if err != nil {
-				return fmt.Errorf("failed to send message: %v", err)
-			}
-			if replyMessageData["msgType"] == nil || replyMessageData["msgType"] != workphone.EnumMsgType_name[int32(workphone.EnumMsgType_MsgReceivedAck)] {
-				return fmt.Errorf("failed to send message: %v")
-			}
-
-			return nil
 		} else {
-			conn, err := h.connWorkPhone()
-			if err != nil {
-				err = fmt.Errorf("SendTextMsg failed")
-				return err
-			}
-			defer func(conn *websocket.Conn) {
-				err = conn.Close()
-				if err != nil {
-					err = fmt.Errorf("SendTextMsg failed")
-				}
-			}(conn)
-			encodedString := base64.StdEncoding.EncodeToString([]byte(picpath))
-			contentType := "File"
-			if isImageFile(diyfilename) {
-				contentType = "Picture"
-			}
-
-			message := map[string]interface{}{
+			message = map[string]interface{}{
 				"MsgType": "TalkToFriendTask",
 				"Content": map[string]interface{}{
 					"WeChatId":    wxWxid,
@@ -209,31 +169,32 @@ func (h *Hook) SendPicMsg(wxid, picpath, diyfilename, wxWxid string) (err error)
 					"Content":     encodedString,
 				},
 			}
-			transportMessageJSON, err := json.Marshal(message)
-			if err != nil {
-				return err
-			}
-			// 发送 JSON 消息
-			err = conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
-			if err != nil {
-				return fmt.Errorf("failed to send message: %v", err)
-			}
-			// 读取回复消息
-			_, replyMessage, err := conn.ReadMessage()
-			if err != nil {
-				return fmt.Errorf("failed to read message: %v", err)
-			}
-			var replyMessageData map[string]interface{}
-			err = json.Unmarshal([]byte(replyMessage), &replyMessageData)
-			if err != nil {
-				return fmt.Errorf("failed to send message: %v", err)
-			}
-			if replyMessageData["msgType"] == nil || replyMessageData["msgType"] != workphone.EnumMsgType_name[int32(workphone.EnumMsgType_MsgReceivedAck)] {
-				return fmt.Errorf("failed to send message: %v")
-			}
+		}
 
-			return nil
+		transportMessageJSON, err := json.Marshal(message)
+		if err != nil {
+			return err
+		}
+		// 发送 JSON 消息
+		err = conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
+		if err != nil {
+			return fmt.Errorf("failed to send message: %v", err)
+		}
+		// 读取回复消息
+		_, replyMessage, err := conn.ReadMessage()
+		if err != nil {
+			return fmt.Errorf("failed to read message: %v", err)
 		}
+		var replyMessageData map[string]interface{}
+		err = json.Unmarshal([]byte(replyMessage), &replyMessageData)
+		if err != nil {
+			return fmt.Errorf("failed to send message: %v", err)
+		}
+		if replyMessageData["msgType"] == nil || replyMessageData["msgType"] != workphone.EnumMsgType_name[int32(workphone.EnumMsgType_MsgReceivedAck)] {
+			return fmt.Errorf("failed to send message: %v")
+		}
+
+		return nil
 	} else {
 		resp, err := h.Client.R().SetBody(&SendPicMsgReq{
 			Wxid:        wxid,

+ 11 - 1
internal/logic/Wxhook/send_text_msg_logic.go

@@ -61,7 +61,17 @@ func (l *SendTextMsgLogic) SendTextMsg(req *types.SendTextMsgReq) (resp *types.B
 		port = wxInfo.Port
 	}
 
-	hookClient := hook.NewHook(privateIP, adminPort, port)
+	var ctype uint64
+	if req.Ctype != nil && *req.Ctype != 0 {
+		ctype = *req.Ctype
+	}
+
+	var hookClient *hook.Hook
+	if ctype == 3 {
+		hookClient = hook.NewWecomHook("", adminPort, port)
+	} else {
+		hookClient = hook.NewHook(privateIP, adminPort, port)
+	}
 
 	err = hookClient.SendTextMsg(*req.Wxid, *req.Msg, wxInfo.Wxid)
 

+ 2 - 1
internal/types/types.go

@@ -943,7 +943,8 @@ type SendTextMsgReq struct {
 	// 微信id 公众号微信ID
 	Wxid *string `json:"receiverWxId"`
 	// 微信文本消息内容
-	Msg *string `json:"msg"`
+	Msg   *string `json:"msg"`
+	Ctype *uint64 `json:"ctype,optional"`
 }
 
 // 发送微信图片消息请求参数