wechat_ws_client.go 7.2 KB


  1. package wechat_ws
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "github.com/gorilla/websocket"
  6. "github.com/zeromicro/go-zero/core/logx"
  7. "google.golang.org/protobuf/encoding/protojson"
  8. "google.golang.org/protobuf/types/known/anypb"
  9. "time"
  10. "wechat-api/workphone"
  11. )
  12. const (
  13. // Time allowed to write a message to the peer.
  14. writeWait = 10 * time.Second
  15. // Time allowed to read the next pong message from the peer.
  16. pongWait = 30 * time.Second
  17. // Send pings to peer with this period. Must be less than pongWait.
  18. pingPeriod = (pongWait * 4) / 10
  19. // Maximum message size allowed from peer.
  20. maxMessageSize = 2048
  21. )
  22. var (
  23. newline = []byte{'\n'}
  24. space = []byte{' '}
  25. )
  26. type WechatWsClient struct {
  27. Conn *websocket.Conn
  28. AccessToken string
  29. Send chan []byte
  30. AppId string
  31. }
  32. func NewWechatWsClient(urlStr string, appid string) (*WechatWsClient, error) {
  33. logx.Debug("实例开始")
  34. c, _, err := websocket.DefaultDialer.Dial(urlStr, nil)
  35. if err != nil {
  36. logx.Error(err)
  37. return nil, err
  38. }
  39. client := &WechatWsClient{
  40. Conn: c,
  41. AccessToken: "",
  42. Send: make(chan []byte, 256),
  43. AppId: appid,
  44. }
  45. err = client.DeviceAuth()
  46. if err != nil {
  47. return nil, err
  48. }
  49. return client, nil
  50. }
  51. // SendMsg 立刻发送消息,发送不成功可以获取错误信息
  52. func (c *WechatWsClient) SendMsg(message []byte) error {
  53. logx.Info("发送消息:", string(message))
  54. err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  55. if err != nil {
  56. return err
  57. }
  58. err = c.Conn.WriteMessage(websocket.TextMessage, message)
  59. if err != nil {
  60. return err
  61. }
  62. return nil
  63. }
  64. // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
  65. func (c *WechatWsClient) SendMsgByChan(msg []byte) {
  66. c.Send <- msg
  67. }
  68. func (c *WechatWsClient) WritePump() {
  69. ticker := time.NewTicker(pingPeriod)
  70. defer func() {
  71. ticker.Stop()
  72. err := c.Conn.Close()
  73. if err != nil {
  74. return
  75. }
  76. }()
  77. for {
  78. select {
  79. case message, ok := <-c.Send:
  80. if !ok {
  81. continue
  82. }
  83. _ = c.SendMsg(message)
  84. // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
  85. //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  86. //if !ok {
  87. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  88. // return
  89. //}
  90. //
  91. //w, err := c.conn.NextWriter(websocket.TextMessage)
  92. //if err != nil {
  93. // return
  94. //}
  95. //_, _ = w.Write(message)
  96. //
  97. //// Add queued chat messages to the current websocket message.
  98. //n := len(c.send)
  99. //for i := 0; i < n; i++ {
  100. // _, _ = w.Write(newline)
  101. // _, _ = w.Write(<-c.send)
  102. //}
  103. //
  104. //if err := w.Close(); err != nil {
  105. // return
  106. //}
  107. case <-ticker.C:
  108. if c.AccessToken == "" {
  109. logx.Error("accessToken is empty")
  110. continue
  111. }
  112. message := map[string]interface{}{
  113. "Id": 1001,
  114. "MsgType": "HeartBeatReq",
  115. "AccessToken": c.AccessToken,
  116. "Content": map[string]string{
  117. "token": c.AccessToken,
  118. },
  119. }
  120. //transportMessageJSON, err := json.Marshal(message)
  121. //if err != nil {
  122. // logx.Error(err)
  123. // continue
  124. //}
  125. //
  126. _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  127. //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  128. err := c.Conn.WriteJSON(message)
  129. if err != nil {
  130. logx.Error(err)
  131. continue
  132. }
  133. logx.Info("发送心跳保活~~")
  134. }
  135. }
  136. }
  137. func (c *WechatWsClient) ReadPump(handler MessageHandler) {
  138. defer func() {
  139. err := c.Conn.Close()
  140. if err != nil {
  141. return
  142. }
  143. }()
  144. c.Conn.SetReadLimit(maxMessageSize)
  145. err := c.Conn.SetReadDeadline(time.Time{})
  146. if err != nil {
  147. logx.Errorf("SetReadDeadline error: %v", err)
  148. return
  149. }
  150. //c.conn.SetPongHandler(func(string) error {
  151. // err := c.conn.SetReadDeadline(time.Time{})
  152. // if err != nil {
  153. // return err
  154. // }
  155. // return nil
  156. //})
  157. for {
  158. _, message, err := c.Conn.ReadMessage()
  159. if err != nil {
  160. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  161. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  162. } else {
  163. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  164. }
  165. break
  166. }
  167. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  168. logx.Debugf("message 获取的消息原型是 : %s", string(message))
  169. var msg MsgJsonObject
  170. //err = c.Conn.ReadJSON(msg)
  171. err = json.Unmarshal(message, &msg)
  172. if err != nil {
  173. logx.Error(err)
  174. continue
  175. }
  176. //logx.Info("message 获取的消息原型是 : ", msg)
  177. switch msg.MsgType {
  178. case "MsgReceivedAck":
  179. logx.Info("心跳确认消息已收到,无需处理")
  180. default:
  181. if handler != nil {
  182. err = handler(msg)
  183. if err != nil {
  184. logx.Error(err)
  185. }
  186. }
  187. }
  188. }
  189. }
  190. // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
  191. //
  192. //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
  193. func (c *WechatWsClient) DeviceAuth2() error {
  194. sendMsg := &workphone.DeviceAuthReqMessage{
  195. AuthType: workphone.DeviceAuthReqMessage_InternalCode,
  196. Credential: "",
  197. }
  198. content, err := anypb.New(sendMsg)
  199. if err != nil {
  200. logx.Error(err)
  201. return err
  202. }
  203. logx.Info("content is ", content)
  204. transportMessage := &workphone.TransportMessage{
  205. Id: int64(workphone.EnumMsgType_DeviceAuthReq),
  206. MsgType: workphone.EnumMsgType_DeviceAuthReq,
  207. Content: content,
  208. }
  209. transportMessageJSON, err := protojson.MarshalOptions{
  210. UseProtoNames: true,
  211. }.Marshal(transportMessage)
  212. if err != nil {
  213. logx.Error(err)
  214. return err
  215. }
  216. logx.Info(string(transportMessageJSON))
  217. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  218. if err != nil {
  219. logx.Error(err)
  220. return err
  221. }
  222. return nil
  223. }
  224. // DeviceAuth 连接认证,使用IP白名单方式进行认证
  225. func (c *WechatWsClient) DeviceAuth() error {
  226. message := map[string]interface{}{
  227. "Id": 1010,
  228. "AccessToken": "",
  229. "MsgType": "DeviceAuthReq",
  230. "Content": map[string]interface{}{
  231. "AuthType": 3,
  232. "Credential": "",
  233. },
  234. }
  235. transportMessageJSON, err := json.Marshal(message)
  236. if err != nil {
  237. logx.Error(err)
  238. return err
  239. }
  240. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  241. if err != nil {
  242. logx.Error(err)
  243. return err
  244. }
  245. for {
  246. _, msgByte, err := c.Conn.ReadMessage()
  247. if err != nil {
  248. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  249. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  250. } else {
  251. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  252. }
  253. break
  254. }
  255. msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
  256. logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
  257. var msg MsgJsonObject
  258. err = json.Unmarshal(msgByte, &msg)
  259. if err != nil {
  260. logx.Error(err)
  261. continue
  262. }
  263. if msg.MsgType != "DeviceAuthRsp" {
  264. logx.Error("不是连接认证消息,丢弃~")
  265. continue
  266. }
  267. var deviceAuthRsp workphone.DeviceAuthRspMessage
  268. err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
  269. if err != nil {
  270. logx.Error(err)
  271. continue
  272. }
  273. logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
  274. c.AccessToken = deviceAuthRsp.AccessToken
  275. break
  276. }
  277. return nil
  278. }