123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361 |
- package wechat_ws
- import (
- "bytes"
- "encoding/json"
- "errors"
- "github.com/gorilla/websocket"
- "github.com/zeromicro/go-zero/core/logx"
- "google.golang.org/protobuf/encoding/protojson"
- "google.golang.org/protobuf/types/known/anypb"
- "time"
- "wechat-api/workphone"
- )
- const (
- // Time allowed to write a message to the peer.
- writeWait = 10 * time.Second
- // Time allowed to read the next pong message from the peer.
- pongWait = 30 * time.Second
- // Send pings to peer with this period. Must be less than pongWait.
- pingPeriod = (pongWait * 4) / 10
- // Maximum message size allowed from peer.
- maxMessageSize = 4096
- )
- var (
- newline = []byte{'\n'}
- space = []byte{' '}
- )
- type WechatWsClient struct {
- Conn *websocket.Conn
- AccessToken string
- Send chan []byte
- Name string
- CTypes string
- MessageHandler []MessageHandler
- }
- func NewWechatWsClient(urlStr string, name string, ctype string) (*WechatWsClient, error) {
- logx.Debug("实例开始")
- //p, _ := url.Parse("http://127.0.0.1:7897")
- var d = websocket.Dialer{
- //Proxy: http.ProxyURL(&url.URL{
- // Scheme: "http", // or "https" depending on your proxy
- // Host: "127.0.0.1:7897",
- // Path: "/",
- //}),
- }
- c, _, err := d.Dial(urlStr, nil)
- if err != nil {
- logx.Error(err)
- return nil, err
- }
- client := &WechatWsClient{
- Conn: c,
- AccessToken: "",
- Send: make(chan []byte, 256),
- Name: name,
- CTypes: ctype,
- MessageHandler: make([]MessageHandler, 0),
- }
- err = client.DeviceAuth()
- if err != nil {
- return nil, err
- }
- return client, nil
- }
- // SendMsg 立刻发送消息,发送不成功可以获取错误信息
- func (c *WechatWsClient) SendMsg(message []byte) error {
- logx.Info("发送消息:", string(message))
- err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- if err != nil {
- return err
- }
- err = c.Conn.WriteMessage(websocket.TextMessage, message)
- if err != nil {
- return err
- }
- return nil
- }
- // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
- func (c *WechatWsClient) SendMsgByChan(msg []byte) {
- c.Send <- msg
- }
- func (c *WechatWsClient) WritePump() {
- ticker := time.NewTicker(pingPeriod)
- defer func() {
- ticker.Stop()
- err := c.Conn.Close()
- if err != nil {
- return
- }
- }()
- for {
- select {
- case message, ok := <-c.Send:
- if !ok {
- continue
- }
- _ = c.SendMsg(message)
- // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
- //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
- //if !ok {
- // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
- // return
- //}
- //
- //w, err := c.conn.NextWriter(websocket.TextMessage)
- //if err != nil {
- // return
- //}
- //_, _ = w.Write(message)
- //
- //// Add queued chat messages to the current websocket message.
- //n := len(c.send)
- //for i := 0; i < n; i++ {
- // _, _ = w.Write(newline)
- // _, _ = w.Write(<-c.send)
- //}
- //
- //if err := w.Close(); err != nil {
- // return
- //}
- case <-ticker.C:
- if c.AccessToken == "" {
- logx.Error("accessToken is empty")
- continue
- }
- message := map[string]interface{}{
- "Id": 1001,
- "MsgType": "HeartBeatReq",
- "AccessToken": c.AccessToken,
- "Content": map[string]string{
- "token": c.AccessToken,
- },
- }
- //transportMessageJSON, err := json.Marshal(message)
- //if err != nil {
- // logx.Error(err)
- // continue
- //}
- //
- _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
- //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
- err := c.Conn.WriteJSON(message)
- if err != nil {
- logx.Error(err)
- continue
- }
- logx.Info("发送心跳保活~~")
- }
- }
- }
- func (c *WechatWsClient) RegisterMessageHandler(handler MessageHandler) {
- if c == nil {
- logx.Error("WechatWsClient is nil in RegisterMessageHandler")
- return
- }
- logx.Info("Registering message handler for WechatWsClient:", c.Name)
- c.MessageHandler = append(c.MessageHandler, handler)
- }
- func (c *WechatWsClient) ReadPump() {
- defer func() {
- err := c.Conn.Close()
- if err != nil {
- return
- }
- }()
- //c.Conn.SetReadLimit(maxMessageSize)
- //err := c.Conn.SetReadDeadline(time.Time{})
- //if err != nil {
- // logx.Errorf("SetReadDeadline error: %v", err)
- // return
- //}
- //c.conn.SetPongHandler(func(string) error {
- // err := c.conn.SetReadDeadline(time.Time{})
- // if err != nil {
- // return err
- // }
- // return nil
- //})
- for {
- _, message, err := c.Conn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- logx.Errorf("IsUnexpectedCloseError error: %v", err)
- } else {
- logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
- }
- break
- }
- message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
- logx.Debugf("message 获取的消息原型是 : %s", string(message))
- var msg MsgJsonObject
- //err = c.Conn.ReadJSON(msg)
- err = json.Unmarshal(message, &msg)
- if err != nil {
- logx.Error(err)
- continue
- }
- //logx.Info("message 获取的消息原型是 : ", msg)
- switch msg.MsgType {
- case "MsgReceivedAck":
- logx.Info("心跳确认消息已收到,无需处理")
- default:
- if len(c.MessageHandler) > 0 {
- for _, handler := range c.MessageHandler {
- // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理
- go func() {
- err = handler(&msg)
- if err != nil {
- logx.Error(err)
- }
- }()
- }
- }
- }
- }
- }
- // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
- //
- //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
- func (c *WechatWsClient) DeviceAuth2() error {
- sendMsg := &workphone.DeviceAuthReqMessage{
- AuthType: workphone.DeviceAuthReqMessage_InternalCode,
- Credential: "",
- }
- content, err := anypb.New(sendMsg)
- if err != nil {
- logx.Error(err)
- return err
- }
- logx.Info("content is ", content)
- transportMessage := &workphone.TransportMessage{
- Id: int64(workphone.EnumMsgType_DeviceAuthReq),
- MsgType: workphone.EnumMsgType_DeviceAuthReq,
- Content: content,
- }
- transportMessageJSON, err := protojson.MarshalOptions{
- UseProtoNames: true,
- }.Marshal(transportMessage)
- if err != nil {
- logx.Error(err)
- return err
- }
- logx.Info(string(transportMessageJSON))
- err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
- if err != nil {
- logx.Error(err)
- return err
- }
- return nil
- }
- // DeviceAuth 连接认证,使用IP白名单方式进行认证
- func (c *WechatWsClient) DeviceAuth() error {
- message := map[string]interface{}{
- "Id": 1010,
- "AccessToken": "",
- "MsgType": "DeviceAuthReq",
- "Content": map[string]interface{}{
- "AuthType": 3,
- "Credential": "",
- },
- }
- transportMessageJSON, err := json.Marshal(message)
- if err != nil {
- logx.Error(err)
- return err
- }
- err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
- if err != nil {
- logx.Error(err)
- return err
- }
- for {
- _, msgByte, err := c.Conn.ReadMessage()
- if err != nil {
- if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
- logx.Errorf("IsUnexpectedCloseError error: %v", err)
- } else {
- logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
- }
- break
- }
- msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
- logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
- var msg MsgJsonObject
- err = json.Unmarshal(msgByte, &msg)
- if err != nil {
- logx.Error(err)
- continue
- }
- if msg.MsgType == "Error" {
- var errMsg workphone.ErrorMessage
- _ = protojson.Unmarshal([]byte(msg.Message), &errMsg)
- logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg)
- return errors.New("连接认证失败,请检查IP白名单是否正确")
- }
- if msg.MsgType != "DeviceAuthRsp" {
- logx.Error("不是连接认证消息,丢弃~", string(msgByte))
- continue
- }
- var deviceAuthRsp workphone.DeviceAuthRspMessage
- err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
- if err != nil {
- logx.Error(err)
- continue
- }
- logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
- c.AccessToken = deviceAuthRsp.AccessToken
- break
- }
- return nil
- }
|