wechat_ws_client.go 8.3 KB


  1. package wechat_ws
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "github.com/gorilla/websocket"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "google.golang.org/protobuf/encoding/protojson"
  9. "google.golang.org/protobuf/types/known/anypb"
  10. "time"
  11. "wechat-api/workphone"
  12. )
  13. const (
  14. // Time allowed to write a message to the peer.
  15. writeWait = 10 * time.Second
  16. // Time allowed to read the next pong message from the peer.
  17. pongWait = 30 * time.Second
  18. // Send pings to peer with this period. Must be less than pongWait.
  19. pingPeriod = (pongWait * 4) / 10
  20. // Maximum message size allowed from peer.
  21. maxMessageSize = 4096
  22. )
  23. var (
  24. newline = []byte{'\n'}
  25. space = []byte{' '}
  26. )
  27. type WechatWsClient struct {
  28. Conn *websocket.Conn
  29. AccessToken string
  30. Send chan []byte
  31. Name string
  32. CTypes string
  33. MessageHandler []MessageHandler
  34. }
  35. func NewWechatWsClient(urlStr string, name string, ctype string) (*WechatWsClient, error) {
  36. logx.Debug("实例开始")
  37. //p, _ := url.Parse("http://127.0.0.1:7897")
  38. var d = websocket.Dialer{
  39. //Proxy: http.ProxyURL(&url.URL{
  40. // Scheme: "http", // or "https" depending on your proxy
  41. // Host: "127.0.0.1:7897",
  42. // Path: "/",
  43. //}),
  44. }
  45. c, _, err := d.Dial(urlStr, nil)
  46. if err != nil {
  47. logx.Error(err)
  48. return nil, err
  49. }
  50. client := &WechatWsClient{
  51. Conn: c,
  52. AccessToken: "",
  53. Send: make(chan []byte, 256),
  54. Name: name,
  55. CTypes: ctype,
  56. MessageHandler: make([]MessageHandler, 0),
  57. }
  58. err = client.DeviceAuth()
  59. if err != nil {
  60. return nil, err
  61. }
  62. return client, nil
  63. }
  64. // SendMsg 立刻发送消息,发送不成功可以获取错误信息
  65. func (c *WechatWsClient) SendMsg(message []byte) error {
  66. logx.Info("发送消息:", string(message))
  67. err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  68. if err != nil {
  69. return err
  70. }
  71. err = c.Conn.WriteMessage(websocket.TextMessage, message)
  72. if err != nil {
  73. return err
  74. }
  75. return nil
  76. }
  77. // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
  78. func (c *WechatWsClient) SendMsgByChan(msg []byte) {
  79. c.Send <- msg
  80. }
  81. func (c *WechatWsClient) WritePump() {
  82. ticker := time.NewTicker(pingPeriod)
  83. defer func() {
  84. ticker.Stop()
  85. err := c.Conn.Close()
  86. if err != nil {
  87. return
  88. }
  89. }()
  90. for {
  91. select {
  92. case message, ok := <-c.Send:
  93. if !ok {
  94. continue
  95. }
  96. _ = c.SendMsg(message)
  97. // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
  98. //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  99. //if !ok {
  100. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  101. // return
  102. //}
  103. //
  104. //w, err := c.conn.NextWriter(websocket.TextMessage)
  105. //if err != nil {
  106. // return
  107. //}
  108. //_, _ = w.Write(message)
  109. //
  110. //// Add queued chat messages to the current websocket message.
  111. //n := len(c.send)
  112. //for i := 0; i < n; i++ {
  113. // _, _ = w.Write(newline)
  114. // _, _ = w.Write(<-c.send)
  115. //}
  116. //
  117. //if err := w.Close(); err != nil {
  118. // return
  119. //}
  120. case <-ticker.C:
  121. if c.AccessToken == "" {
  122. logx.Error("accessToken is empty")
  123. continue
  124. }
  125. message := map[string]interface{}{
  126. "Id": 1001,
  127. "MsgType": "HeartBeatReq",
  128. "AccessToken": c.AccessToken,
  129. "Content": map[string]string{
  130. "token": c.AccessToken,
  131. },
  132. }
  133. //transportMessageJSON, err := json.Marshal(message)
  134. //if err != nil {
  135. // logx.Error(err)
  136. // continue
  137. //}
  138. //
  139. _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  140. //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  141. err := c.Conn.WriteJSON(message)
  142. if err != nil {
  143. logx.Error(err)
  144. continue
  145. }
  146. logx.Info("发送心跳保活~~")
  147. }
  148. }
  149. }
  150. func (c *WechatWsClient) RegisterMessageHandler(handler MessageHandler) {
  151. if c == nil {
  152. logx.Error("WechatWsClient is nil in RegisterMessageHandler")
  153. return
  154. }
  155. logx.Info("Registering message handler for WechatWsClient:", c.Name)
  156. c.MessageHandler = append(c.MessageHandler, handler)
  157. }
  158. func (c *WechatWsClient) ReadPump() {
  159. defer func() {
  160. err := c.Conn.Close()
  161. if err != nil {
  162. return
  163. }
  164. }()
  165. //c.Conn.SetReadLimit(maxMessageSize)
  166. //err := c.Conn.SetReadDeadline(time.Time{})
  167. //if err != nil {
  168. // logx.Errorf("SetReadDeadline error: %v", err)
  169. // return
  170. //}
  171. //c.conn.SetPongHandler(func(string) error {
  172. // err := c.conn.SetReadDeadline(time.Time{})
  173. // if err != nil {
  174. // return err
  175. // }
  176. // return nil
  177. //})
  178. for {
  179. _, message, err := c.Conn.ReadMessage()
  180. if err != nil {
  181. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  182. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  183. } else {
  184. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  185. }
  186. break
  187. }
  188. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  189. logx.Debugf("message 获取的消息原型是 : %s", string(message))
  190. var msg MsgJsonObject
  191. //err = c.Conn.ReadJSON(msg)
  192. err = json.Unmarshal(message, &msg)
  193. if err != nil {
  194. logx.Error(err)
  195. continue
  196. }
  197. //logx.Info("message 获取的消息原型是 : ", msg)
  198. switch msg.MsgType {
  199. case "MsgReceivedAck":
  200. logx.Info("心跳确认消息已收到,无需处理")
  201. default:
  202. if len(c.MessageHandler) > 0 {
  203. for _, handler := range c.MessageHandler {
  204. // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理
  205. go func() {
  206. err = handler(&msg)
  207. if err != nil {
  208. logx.Error(err)
  209. }
  210. }()
  211. }
  212. }
  213. }
  214. }
  215. }
  216. // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
  217. //
  218. //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
  219. func (c *WechatWsClient) DeviceAuth2() error {
  220. sendMsg := &workphone.DeviceAuthReqMessage{
  221. AuthType: workphone.DeviceAuthReqMessage_InternalCode,
  222. Credential: "",
  223. }
  224. content, err := anypb.New(sendMsg)
  225. if err != nil {
  226. logx.Error(err)
  227. return err
  228. }
  229. logx.Info("content is ", content)
  230. transportMessage := &workphone.TransportMessage{
  231. Id: int64(workphone.EnumMsgType_DeviceAuthReq),
  232. MsgType: workphone.EnumMsgType_DeviceAuthReq,
  233. Content: content,
  234. }
  235. transportMessageJSON, err := protojson.MarshalOptions{
  236. UseProtoNames: true,
  237. }.Marshal(transportMessage)
  238. if err != nil {
  239. logx.Error(err)
  240. return err
  241. }
  242. logx.Info(string(transportMessageJSON))
  243. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  244. if err != nil {
  245. logx.Error(err)
  246. return err
  247. }
  248. return nil
  249. }
  250. // DeviceAuth 连接认证,使用IP白名单方式进行认证
  251. func (c *WechatWsClient) DeviceAuth() error {
  252. message := map[string]interface{}{
  253. "Id": 1010,
  254. "AccessToken": "",
  255. "MsgType": "DeviceAuthReq",
  256. "Content": map[string]interface{}{
  257. "AuthType": 3,
  258. "Credential": "",
  259. },
  260. }
  261. transportMessageJSON, err := json.Marshal(message)
  262. if err != nil {
  263. logx.Error(err)
  264. return err
  265. }
  266. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  267. if err != nil {
  268. logx.Error(err)
  269. return err
  270. }
  271. for {
  272. _, msgByte, err := c.Conn.ReadMessage()
  273. if err != nil {
  274. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  275. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  276. } else {
  277. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  278. }
  279. break
  280. }
  281. msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
  282. logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
  283. var msg MsgJsonObject
  284. err = json.Unmarshal(msgByte, &msg)
  285. if err != nil {
  286. logx.Error(err)
  287. continue
  288. }
  289. if msg.MsgType == "Error" {
  290. var errMsg workphone.ErrorMessage
  291. _ = protojson.Unmarshal([]byte(msg.Message), &errMsg)
  292. logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg)
  293. return errors.New("连接认证失败,请检查IP白名单是否正确")
  294. }
  295. if msg.MsgType != "DeviceAuthRsp" {
  296. logx.Error("不是连接认证消息,丢弃~", string(msgByte))
  297. continue
  298. }
  299. var deviceAuthRsp workphone.DeviceAuthRspMessage
  300. err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
  301. if err != nil {
  302. logx.Error(err)
  303. continue
  304. }
  305. logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
  306. c.AccessToken = deviceAuthRsp.AccessToken
  307. break
  308. }
  309. return nil
  310. }