Procházet zdrojové kódy

Merge branch 'wjh' into debug

wuroy.eth před 1 týdnem
rodič
revize
ea0be24111

+ 9 - 1
etc/wechat.yaml

@@ -84,4 +84,12 @@ Fastgpt:
 
 Xiaoice:
   SubscriptionKey: ccaa5a6ff70f4393a934e69b9ace31bb
-  GptbotsAuthorization: app-hQL7oVq57McK5VBHlsMfhtUD
+  GptbotsAuthorization: app-hQL7oVq57McK5VBHlsMfhtUD
+
+WeChatWs:
+  -
+    Url: ws://chat.gkscrm.com:13088
+    Appid: default
+  -
+    Url: ws://chat.gkscrm.com:13088
+    Appid: wechat

+ 1 - 0
internal/config/config.go

@@ -21,4 +21,5 @@ type Config struct {
 	Aliyun             types.Aliyun
 	CoreRpc            zrpc.RpcClientConf
 	Xiaoice            types.Xiaoice
+	WeChatWs           []types.WsConfig
 }

+ 33 - 0
internal/pkg/wechat_ws/test/main.go

@@ -0,0 +1,33 @@
+package main
+
+import (
+	"github.com/zeromicro/go-zero/core/logx"
+	"net/url"
+	"wechat-api/internal/pkg/wechat_ws"
+)
+
+func msgHandler(msg wechat_ws.MsgJsonObject) error {
+	logx.Info("当前处理的消息是:", msg)
+	return nil
+}
+
+func main() {
+	u := url.URL{Scheme: "ws", Host: "chat.gkscrm.com:13088"}
+
+	logx.Info(u.String())
+
+	client, err := wechat_ws.NewWechatWsClient(u.String(), "default")
+	if err != nil {
+		return
+	}
+
+	go client.WritePump()
+
+	//client.SendMsg([]byte(`{"msgType":"text","message":"你好"}`))
+	//
+	//time.Sleep(5)
+	//
+	//client.SendMsg([]byte(`{"msgType":"text","message":"二次你好"}`))
+
+	client.ReadPump(msgHandler)
+}

+ 10 - 0
internal/pkg/wechat_ws/type.go

@@ -0,0 +1,10 @@
+package wechat_ws
+
+type MsgJsonObject struct {
+	AccessToken string `json:"accessToken,omitempty"` //设备通信token
+	MsgType     string `json:"msgType,omitempty"`     //承载的具体消息类型
+	Message     string `json:"message,omitempty"`     //具体的消息数据
+	RefMsgId    int64  `json:"refMsgId,omitempty"`
+}
+
+type MessageHandler func(msg MsgJsonObject) error

+ 323 - 0
internal/pkg/wechat_ws/wechat_ws_client.go

@@ -0,0 +1,323 @@
+package wechat_ws
+
+import (
+	"bytes"
+	"encoding/json"
+	"github.com/gorilla/websocket"
+	"github.com/zeromicro/go-zero/core/logx"
+	"google.golang.org/protobuf/encoding/protojson"
+	"google.golang.org/protobuf/types/known/anypb"
+	"time"
+	"wechat-api/workphone"
+)
+
+const (
+	// Time allowed to write a message to the peer.
+	writeWait = 10 * time.Second
+
+	// Time allowed to read the next pong message from the peer.
+	pongWait = 30 * time.Second
+
+	// Send pings to peer with this period. Must be less than pongWait.
+	pingPeriod = (pongWait * 4) / 10
+
+	// Maximum message size allowed from peer.
+	maxMessageSize = 2048
+)
+
+var (
+	newline = []byte{'\n'}
+	space   = []byte{' '}
+)
+
+type WechatWsClient struct {
+	Conn        *websocket.Conn
+	AccessToken string
+	Send        chan []byte
+	AppId       string
+}
+
+func NewWechatWsClient(urlStr string, appid string) (*WechatWsClient, error) {
+	logx.Debug("实例开始")
+	c, _, err := websocket.DefaultDialer.Dial(urlStr, nil)
+	if err != nil {
+		logx.Error(err)
+		return nil, err
+	}
+
+	client := &WechatWsClient{
+		Conn:        c,
+		AccessToken: "",
+		Send:        make(chan []byte, 256),
+		AppId:       appid,
+	}
+
+	err = client.DeviceAuth()
+	if err != nil {
+		return nil, err
+	}
+
+	return client, nil
+}
+
+// SendMsg 立刻发送消息,发送不成功可以获取错误信息
+func (c *WechatWsClient) SendMsg(message []byte) error {
+	logx.Info("发送消息:", string(message))
+	err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
+	if err != nil {
+		return err
+	}
+	err = c.Conn.WriteMessage(websocket.TextMessage, message)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
+func (c *WechatWsClient) SendMsgByChan(msg []byte) {
+	c.Send <- msg
+}
+
+func (c *WechatWsClient) WritePump() {
+	ticker := time.NewTicker(pingPeriod)
+	defer func() {
+		ticker.Stop()
+		err := c.Conn.Close()
+		if err != nil {
+			return
+		}
+	}()
+	for {
+		select {
+		case message, ok := <-c.Send:
+			if !ok {
+				continue
+			}
+			_ = c.SendMsg(message)
+
+			// 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
+			//_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
+			//if !ok {
+			//	_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
+			//	return
+			//}
+			//
+			//w, err := c.conn.NextWriter(websocket.TextMessage)
+			//if err != nil {
+			//	return
+			//}
+			//_, _ = w.Write(message)
+			//
+			//// Add queued chat messages to the current websocket message.
+			//n := len(c.send)
+			//for i := 0; i < n; i++ {
+			//	_, _ = w.Write(newline)
+			//	_, _ = w.Write(<-c.send)
+			//}
+			//
+			//if err := w.Close(); err != nil {
+			//	return
+			//}
+		case <-ticker.C:
+			if c.AccessToken == "" {
+				logx.Error("accessToken is empty")
+				continue
+			}
+
+			message := map[string]interface{}{
+				"Id":          1001,
+				"MsgType":     "HeartBeatReq",
+				"AccessToken": c.AccessToken,
+				"Content": map[string]string{
+					"token": c.AccessToken,
+				},
+			}
+
+			//transportMessageJSON, err := json.Marshal(message)
+			//if err != nil {
+			//	logx.Error(err)
+			//	continue
+			//}
+			//
+			_ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
+			//err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
+
+			err := c.Conn.WriteJSON(message)
+
+			if err != nil {
+				logx.Error(err)
+				continue
+			}
+			logx.Info("发送心跳保活~~")
+		}
+	}
+}
+
+func (c *WechatWsClient) ReadPump(handler MessageHandler) {
+	defer func() {
+		err := c.Conn.Close()
+		if err != nil {
+			return
+		}
+	}()
+	c.Conn.SetReadLimit(maxMessageSize)
+	err := c.Conn.SetReadDeadline(time.Time{})
+	if err != nil {
+		logx.Errorf("SetReadDeadline error: %v", err)
+		return
+	}
+	//c.conn.SetPongHandler(func(string) error {
+	//	err := c.conn.SetReadDeadline(time.Time{})
+	//	if err != nil {
+	//		return err
+	//	}
+	//	return nil
+	//})
+	for {
+		_, message, err := c.Conn.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+				logx.Errorf("IsUnexpectedCloseError error: %v", err)
+			} else {
+				logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
+			}
+			break
+		}
+		message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
+
+		logx.Debugf("message 获取的消息原型是 : %s", string(message))
+
+		var msg MsgJsonObject
+		//err = c.Conn.ReadJSON(msg)
+		err = json.Unmarshal(message, &msg)
+		if err != nil {
+			logx.Error(err)
+			continue
+		}
+		//logx.Info("message 获取的消息原型是 : ", msg)
+
+		switch msg.MsgType {
+		case "MsgReceivedAck":
+			logx.Info("心跳确认消息已收到,无需处理")
+		default:
+			if handler != nil {
+				err = handler(msg)
+				if err != nil {
+					logx.Error(err)
+				}
+			}
+		}
+
+	}
+}
+
+// DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
+//
+//go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
+func (c *WechatWsClient) DeviceAuth2() error {
+	sendMsg := &workphone.DeviceAuthReqMessage{
+		AuthType:   workphone.DeviceAuthReqMessage_InternalCode,
+		Credential: "",
+	}
+
+	content, err := anypb.New(sendMsg)
+	if err != nil {
+		logx.Error(err)
+		return err
+	}
+
+	logx.Info("content is ", content)
+
+	transportMessage := &workphone.TransportMessage{
+		Id:      int64(workphone.EnumMsgType_DeviceAuthReq),
+		MsgType: workphone.EnumMsgType_DeviceAuthReq,
+		Content: content,
+	}
+
+	transportMessageJSON, err := protojson.MarshalOptions{
+		UseProtoNames: true,
+	}.Marshal(transportMessage)
+
+	if err != nil {
+		logx.Error(err)
+		return err
+	}
+
+	logx.Info(string(transportMessageJSON))
+
+	err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
+	if err != nil {
+		logx.Error(err)
+		return err
+	}
+
+	return nil
+
+}
+
+// DeviceAuth 连接认证,使用IP白名单方式进行认证
+func (c *WechatWsClient) DeviceAuth() error {
+
+	message := map[string]interface{}{
+		"Id":          1010,
+		"AccessToken": "",
+		"MsgType":     "DeviceAuthReq",
+		"Content": map[string]interface{}{
+			"AuthType":   3,
+			"Credential": "",
+		},
+	}
+	transportMessageJSON, err := json.Marshal(message)
+	if err != nil {
+		logx.Error(err)
+		return err
+	}
+
+	err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
+	if err != nil {
+		logx.Error(err)
+		return err
+	}
+
+	for {
+		_, msgByte, err := c.Conn.ReadMessage()
+		if err != nil {
+			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+				logx.Errorf("IsUnexpectedCloseError error: %v", err)
+			} else {
+				logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
+			}
+			break
+		}
+		msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
+
+		logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
+
+		var msg MsgJsonObject
+
+		err = json.Unmarshal(msgByte, &msg)
+		if err != nil {
+			logx.Error(err)
+			continue
+		}
+
+		if msg.MsgType != "DeviceAuthRsp" {
+			logx.Error("不是连接认证消息,丢弃~")
+			continue
+		}
+
+		var deviceAuthRsp workphone.DeviceAuthRspMessage
+		err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
+		if err != nil {
+			logx.Error(err)
+			continue
+		}
+		logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
+		c.AccessToken = deviceAuthRsp.AccessToken
+		break
+	}
+
+	return nil
+
+}

+ 18 - 0
internal/svc/service_context.go

@@ -4,6 +4,7 @@ import (
 	"github.com/redis/go-redis/v9"
 	"wechat-api/internal/config"
 	"wechat-api/internal/middleware"
+	"wechat-api/internal/pkg/wechat_ws"
 
 	"github.com/suyuan32/simple-admin-core/rpc/coreclient"
 	"github.com/zeromicro/go-zero/core/logx"
@@ -23,6 +24,7 @@ type ServiceContext struct {
 	DB          *ent.Client
 	CoreRpc     coreclient.Core
 	Rds         redis.UniversalClient
+	WechatWs    map[string]*wechat_ws.WechatWsClient
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
@@ -39,6 +41,21 @@ func NewServiceContext(c config.Config) *ServiceContext {
 
 	coreRpc := coreclient.NewCore(zrpc.NewClientIfEnable(c.CoreRpc))
 
+	// 初始化微信ws客户端
+	// todo 现在配置是从 config.yaml中读取的,后续需要改成从数据库中读取,以便匹配不同的微信号
+	var wsClients map[string]*wechat_ws.WechatWsClient
+	for _, ws := range c.WeChatWs {
+		client, err := wechat_ws.NewWechatWsClient(ws.Url, ws.Appid)
+		if err != nil {
+			logx.Error(err)
+			continue
+		}
+		go client.WritePump()
+		go client.ReadPump(nil)
+
+		wsClients[ws.Appid] = client
+	}
+
 	return &ServiceContext{
 		Config:      c,
 		Authority:   middleware.NewAuthorityMiddleware(cbn, rds, coreRpc).Handle,
@@ -46,5 +63,6 @@ func NewServiceContext(c config.Config) *ServiceContext {
 		DB:          db,
 		CoreRpc:     coreRpc,
 		Rds:         rds,
+		WechatWs:    wsClients,
 	}
 }

+ 6 - 0
internal/types/ws.go

@@ -0,0 +1,6 @@
+package types
+
+type WsConfig struct {
+	Url   string
+	Appid string
+}