|
@@ -0,0 +1,323 @@
|
|
|
+package wechat_ws
|
|
|
+
|
|
|
+import (
|
|
|
+ "bytes"
|
|
|
+ "encoding/json"
|
|
|
+ "github.com/gorilla/websocket"
|
|
|
+ "github.com/zeromicro/go-zero/core/logx"
|
|
|
+ "google.golang.org/protobuf/encoding/protojson"
|
|
|
+ "google.golang.org/protobuf/types/known/anypb"
|
|
|
+ "time"
|
|
|
+ "wechat-api/workphone"
|
|
|
+)
|
|
|
+
|
|
|
+const (
|
|
|
+ // Time allowed to write a message to the peer.
|
|
|
+ writeWait = 10 * time.Second
|
|
|
+
|
|
|
+ // Time allowed to read the next pong message from the peer.
|
|
|
+ pongWait = 30 * time.Second
|
|
|
+
|
|
|
+ // Send pings to peer with this period. Must be less than pongWait.
|
|
|
+ pingPeriod = (pongWait * 4) / 10
|
|
|
+
|
|
|
+ // Maximum message size allowed from peer.
|
|
|
+ maxMessageSize = 2048
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ newline = []byte{'\n'}
|
|
|
+ space = []byte{' '}
|
|
|
+)
|
|
|
+
|
|
|
+type WechatWsClient struct {
|
|
|
+ Conn *websocket.Conn
|
|
|
+ AccessToken string
|
|
|
+ Send chan []byte
|
|
|
+ AppId string
|
|
|
+}
|
|
|
+
|
|
|
+func NewWechatWsClient(urlStr string, appid string) (*WechatWsClient, error) {
|
|
|
+ logx.Debug("实例开始")
|
|
|
+ c, _, err := websocket.DefaultDialer.Dial(urlStr, nil)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ client := &WechatWsClient{
|
|
|
+ Conn: c,
|
|
|
+ AccessToken: "",
|
|
|
+ Send: make(chan []byte, 256),
|
|
|
+ AppId: appid,
|
|
|
+ }
|
|
|
+
|
|
|
+ err = client.DeviceAuth()
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return client, nil
|
|
|
+}
|
|
|
+
|
|
|
+// SendMsg 立刻发送消息,发送不成功可以获取错误信息
|
|
|
+func (c *WechatWsClient) SendMsg(message []byte) error {
|
|
|
+ logx.Info("发送消息:", string(message))
|
|
|
+ err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ err = c.Conn.WriteMessage(websocket.TextMessage, message)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
|
|
|
+func (c *WechatWsClient) SendMsgByChan(msg []byte) {
|
|
|
+ c.Send <- msg
|
|
|
+}
|
|
|
+
|
|
|
+func (c *WechatWsClient) WritePump() {
|
|
|
+ ticker := time.NewTicker(pingPeriod)
|
|
|
+ defer func() {
|
|
|
+ ticker.Stop()
|
|
|
+ err := c.Conn.Close()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case message, ok := <-c.Send:
|
|
|
+ if !ok {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ _ = c.SendMsg(message)
|
|
|
+
|
|
|
+ // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
|
|
|
+ //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
+ //if !ok {
|
|
|
+ // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //w, err := c.conn.NextWriter(websocket.TextMessage)
|
|
|
+ //if err != nil {
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ //_, _ = w.Write(message)
|
|
|
+ //
|
|
|
+ //// Add queued chat messages to the current websocket message.
|
|
|
+ //n := len(c.send)
|
|
|
+ //for i := 0; i < n; i++ {
|
|
|
+ // _, _ = w.Write(newline)
|
|
|
+ // _, _ = w.Write(<-c.send)
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ //if err := w.Close(); err != nil {
|
|
|
+ // return
|
|
|
+ //}
|
|
|
+ case <-ticker.C:
|
|
|
+ if c.AccessToken == "" {
|
|
|
+ logx.Error("accessToken is empty")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ message := map[string]interface{}{
|
|
|
+ "Id": 1001,
|
|
|
+ "MsgType": "HeartBeatReq",
|
|
|
+ "AccessToken": c.AccessToken,
|
|
|
+ "Content": map[string]string{
|
|
|
+ "token": c.AccessToken,
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ //transportMessageJSON, err := json.Marshal(message)
|
|
|
+ //if err != nil {
|
|
|
+ // logx.Error(err)
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
+ //
|
|
|
+ _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
|
+ //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
|
|
|
+
|
|
|
+ err := c.Conn.WriteJSON(message)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ logx.Info("发送心跳保活~~")
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (c *WechatWsClient) ReadPump(handler MessageHandler) {
|
|
|
+ defer func() {
|
|
|
+ err := c.Conn.Close()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ c.Conn.SetReadLimit(maxMessageSize)
|
|
|
+ err := c.Conn.SetReadDeadline(time.Time{})
|
|
|
+ if err != nil {
|
|
|
+ logx.Errorf("SetReadDeadline error: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //c.conn.SetPongHandler(func(string) error {
|
|
|
+ // err := c.conn.SetReadDeadline(time.Time{})
|
|
|
+ // if err != nil {
|
|
|
+ // return err
|
|
|
+ // }
|
|
|
+ // return nil
|
|
|
+ //})
|
|
|
+ for {
|
|
|
+ _, message, err := c.Conn.ReadMessage()
|
|
|
+ if err != nil {
|
|
|
+ if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
|
+ logx.Errorf("IsUnexpectedCloseError error: %v", err)
|
|
|
+ } else {
|
|
|
+ logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
|
|
|
+
|
|
|
+ logx.Debugf("message 获取的消息原型是 : %s", string(message))
|
|
|
+
|
|
|
+ var msg MsgJsonObject
|
|
|
+ //err = c.Conn.ReadJSON(msg)
|
|
|
+ err = json.Unmarshal(message, &msg)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //logx.Info("message 获取的消息原型是 : ", msg)
|
|
|
+
|
|
|
+ switch msg.MsgType {
|
|
|
+ case "MsgReceivedAck":
|
|
|
+ logx.Info("心跳确认消息已收到,无需处理")
|
|
|
+ default:
|
|
|
+ if handler != nil {
|
|
|
+ err = handler(msg)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
|
|
|
+//
|
|
|
+//go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
|
|
|
+func (c *WechatWsClient) DeviceAuth2() error {
|
|
|
+ sendMsg := &workphone.DeviceAuthReqMessage{
|
|
|
+ AuthType: workphone.DeviceAuthReqMessage_InternalCode,
|
|
|
+ Credential: "",
|
|
|
+ }
|
|
|
+
|
|
|
+ content, err := anypb.New(sendMsg)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ logx.Info("content is ", content)
|
|
|
+
|
|
|
+ transportMessage := &workphone.TransportMessage{
|
|
|
+ Id: int64(workphone.EnumMsgType_DeviceAuthReq),
|
|
|
+ MsgType: workphone.EnumMsgType_DeviceAuthReq,
|
|
|
+ Content: content,
|
|
|
+ }
|
|
|
+
|
|
|
+ transportMessageJSON, err := protojson.MarshalOptions{
|
|
|
+ UseProtoNames: true,
|
|
|
+ }.Marshal(transportMessage)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ logx.Info(string(transportMessageJSON))
|
|
|
+
|
|
|
+ err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+// DeviceAuth 连接认证,使用IP白名单方式进行认证
|
|
|
+func (c *WechatWsClient) DeviceAuth() error {
|
|
|
+
|
|
|
+ message := map[string]interface{}{
|
|
|
+ "Id": 1010,
|
|
|
+ "AccessToken": "",
|
|
|
+ "MsgType": "DeviceAuthReq",
|
|
|
+ "Content": map[string]interface{}{
|
|
|
+ "AuthType": 3,
|
|
|
+ "Credential": "",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ transportMessageJSON, err := json.Marshal(message)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ for {
|
|
|
+ _, msgByte, err := c.Conn.ReadMessage()
|
|
|
+ if err != nil {
|
|
|
+ if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
|
+ logx.Errorf("IsUnexpectedCloseError error: %v", err)
|
|
|
+ } else {
|
|
|
+ logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
|
|
|
+ }
|
|
|
+ break
|
|
|
+ }
|
|
|
+ msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
|
|
|
+
|
|
|
+ logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
|
|
|
+
|
|
|
+ var msg MsgJsonObject
|
|
|
+
|
|
|
+ err = json.Unmarshal(msgByte, &msg)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if msg.MsgType != "DeviceAuthRsp" {
|
|
|
+ logx.Error("不是连接认证消息,丢弃~")
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ var deviceAuthRsp workphone.DeviceAuthRspMessage
|
|
|
+ err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
|
|
|
+ if err != nil {
|
|
|
+ logx.Error(err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
|
|
|
+ c.AccessToken = deviceAuthRsp.AccessToken
|
|
|
+ break
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+
|
|
|
+}
|