package wechat_ws import ( "bytes" "encoding/json" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/anypb" "time" "wechat-api/workphone" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 4) / 10 // Maximum message size allowed from peer. maxMessageSize = 2048 ) var ( newline = []byte{'\n'} space = []byte{' '} ) type WechatWsClient struct { Conn *websocket.Conn AccessToken string Send chan []byte AppId string } func NewWechatWsClient(urlStr string, appid string) (*WechatWsClient, error) { logx.Debug("实例开始") c, _, err := websocket.DefaultDialer.Dial(urlStr, nil) if err != nil { logx.Error(err) return nil, err } client := &WechatWsClient{ Conn: c, AccessToken: "", Send: make(chan []byte, 256), AppId: appid, } err = client.DeviceAuth() if err != nil { return nil, err } return client, nil } // SendMsg 立刻发送消息,发送不成功可以获取错误信息 func (c *WechatWsClient) SendMsg(message []byte) error { logx.Info("发送消息:", string(message)) err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { return err } err = c.Conn.WriteMessage(websocket.TextMessage, message) if err != nil { return err } return nil } // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误 func (c *WechatWsClient) SendMsgByChan(msg []byte) { c.Send <- msg } func (c *WechatWsClient) WritePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() err := c.Conn.Close() if err != nil { return } }() for { select { case message, ok := <-c.Send: if !ok { continue } _ = c.SendMsg(message) // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持 //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) //if !ok { // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{}) // return //} // //w, err := c.conn.NextWriter(websocket.TextMessage) //if err != nil { // return //} //_, _ = w.Write(message) // //// Add queued chat messages to the current websocket message. //n := len(c.send) //for i := 0; i < n; i++ { // _, _ = w.Write(newline) // _, _ = w.Write(<-c.send) //} // //if err := w.Close(); err != nil { // return //} case <-ticker.C: if c.AccessToken == "" { logx.Error("accessToken is empty") continue } message := map[string]interface{}{ "Id": 1001, "MsgType": "HeartBeatReq", "AccessToken": c.AccessToken, "Content": map[string]string{ "token": c.AccessToken, }, } //transportMessageJSON, err := json.Marshal(message) //if err != nil { // logx.Error(err) // continue //} // _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) err := c.Conn.WriteJSON(message) if err != nil { logx.Error(err) continue } logx.Info("发送心跳保活~~") } } } func (c *WechatWsClient) ReadPump(handler MessageHandler) { defer func() { err := c.Conn.Close() if err != nil { return } }() c.Conn.SetReadLimit(maxMessageSize) err := c.Conn.SetReadDeadline(time.Time{}) if err != nil { logx.Errorf("SetReadDeadline error: %v", err) return } //c.conn.SetPongHandler(func(string) error { // err := c.conn.SetReadDeadline(time.Time{}) // if err != nil { // return err // } // return nil //}) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logx.Errorf("IsUnexpectedCloseError error: %v", err) } else { logx.Errorf("NotIsUnexpectedCloseError error: %v", err) } break } message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) logx.Debugf("message 获取的消息原型是 : %s", string(message)) var msg MsgJsonObject //err = c.Conn.ReadJSON(msg) err = json.Unmarshal(message, &msg) if err != nil { logx.Error(err) continue } //logx.Info("message 获取的消息原型是 : ", msg) switch msg.MsgType { case "MsgReceivedAck": logx.Info("心跳确认消息已收到,无需处理") default: if handler != nil { err = handler(msg) if err != nil { logx.Error(err) } } } } } // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅 // //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。 func (c *WechatWsClient) DeviceAuth2() error { sendMsg := &workphone.DeviceAuthReqMessage{ AuthType: workphone.DeviceAuthReqMessage_InternalCode, Credential: "", } content, err := anypb.New(sendMsg) if err != nil { logx.Error(err) return err } logx.Info("content is ", content) transportMessage := &workphone.TransportMessage{ Id: int64(workphone.EnumMsgType_DeviceAuthReq), MsgType: workphone.EnumMsgType_DeviceAuthReq, Content: content, } transportMessageJSON, err := protojson.MarshalOptions{ UseProtoNames: true, }.Marshal(transportMessage) if err != nil { logx.Error(err) return err } logx.Info(string(transportMessageJSON)) err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) if err != nil { logx.Error(err) return err } return nil } // DeviceAuth 连接认证,使用IP白名单方式进行认证 func (c *WechatWsClient) DeviceAuth() error { message := map[string]interface{}{ "Id": 1010, "AccessToken": "", "MsgType": "DeviceAuthReq", "Content": map[string]interface{}{ "AuthType": 3, "Credential": "", }, } transportMessageJSON, err := json.Marshal(message) if err != nil { logx.Error(err) return err } err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) if err != nil { logx.Error(err) return err } for { _, msgByte, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logx.Errorf("IsUnexpectedCloseError error: %v", err) } else { logx.Errorf("NotIsUnexpectedCloseError error: %v", err) } break } msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1)) logx.Debugf("message 获取的消息原型是 : %s", string(msgByte)) var msg MsgJsonObject err = json.Unmarshal(msgByte, &msg) if err != nil { logx.Error(err) continue } if msg.MsgType != "DeviceAuthRsp" { logx.Error("不是连接认证消息,丢弃~") continue } var deviceAuthRsp workphone.DeviceAuthRspMessage err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp) if err != nil { logx.Error(err) continue } logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken) c.AccessToken = deviceAuthRsp.AccessToken break } return nil }