wechat_ws_client.go 8.4 KB


  1. package wechat_ws
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "github.com/gorilla/websocket"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "google.golang.org/protobuf/encoding/protojson"
  9. "google.golang.org/protobuf/types/known/anypb"
  10. "net/http"
  11. "net/url"
  12. "time"
  13. "wechat-api/workphone"
  14. )
  15. const (
  16. // Time allowed to write a message to the peer.
  17. writeWait = 10 * time.Second
  18. // Time allowed to read the next pong message from the peer.
  19. pongWait = 30 * time.Second
  20. // Send pings to peer with this period. Must be less than pongWait.
  21. pingPeriod = (pongWait * 4) / 10
  22. // Maximum message size allowed from peer.
  23. maxMessageSize = 4096
  24. )
  25. var (
  26. newline = []byte{'\n'}
  27. space = []byte{' '}
  28. )
  29. type WechatWsClient struct {
  30. Conn *websocket.Conn
  31. AccessToken string
  32. Send chan []byte
  33. Name string
  34. CTypes string
  35. MessageHandler []MessageHandler
  36. }
  37. func NewWechatWsClient(urlStr string, name string, ctype string) (*WechatWsClient, error) {
  38. logx.Debug("实例开始")
  39. //p, _ := url.Parse("http://127.0.0.1:7897")
  40. var d = websocket.Dialer{
  41. Proxy: http.ProxyURL(&url.URL{
  42. Scheme: "http", // or "https" depending on your proxy
  43. Host: "127.0.0.1:7897",
  44. Path: "/",
  45. }),
  46. }
  47. c, _, err := d.Dial(urlStr, nil)
  48. if err != nil {
  49. logx.Error(err)
  50. return nil, err
  51. }
  52. client := &WechatWsClient{
  53. Conn: c,
  54. AccessToken: "",
  55. Send: make(chan []byte, 256),
  56. Name: name,
  57. CTypes: ctype,
  58. MessageHandler: make([]MessageHandler, 0),
  59. }
  60. err = client.DeviceAuth()
  61. if err != nil {
  62. return nil, err
  63. }
  64. return client, nil
  65. }
  66. // SendMsg 立刻发送消息,发送不成功可以获取错误信息
  67. func (c *WechatWsClient) SendMsg(message []byte) error {
  68. logx.Info("发送消息:", string(message))
  69. err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  70. if err != nil {
  71. return err
  72. }
  73. err = c.Conn.WriteMessage(websocket.TextMessage, message)
  74. if err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
  80. func (c *WechatWsClient) SendMsgByChan(msg []byte) {
  81. c.Send <- msg
  82. }
  83. func (c *WechatWsClient) WritePump() {
  84. ticker := time.NewTicker(pingPeriod)
  85. defer func() {
  86. ticker.Stop()
  87. err := c.Conn.Close()
  88. if err != nil {
  89. return
  90. }
  91. }()
  92. for {
  93. select {
  94. case message, ok := <-c.Send:
  95. if !ok {
  96. continue
  97. }
  98. _ = c.SendMsg(message)
  99. // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
  100. //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  101. //if !ok {
  102. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  103. // return
  104. //}
  105. //
  106. //w, err := c.conn.NextWriter(websocket.TextMessage)
  107. //if err != nil {
  108. // return
  109. //}
  110. //_, _ = w.Write(message)
  111. //
  112. //// Add queued chat messages to the current websocket message.
  113. //n := len(c.send)
  114. //for i := 0; i < n; i++ {
  115. // _, _ = w.Write(newline)
  116. // _, _ = w.Write(<-c.send)
  117. //}
  118. //
  119. //if err := w.Close(); err != nil {
  120. // return
  121. //}
  122. case <-ticker.C:
  123. if c.AccessToken == "" {
  124. logx.Error("accessToken is empty")
  125. continue
  126. }
  127. message := map[string]interface{}{
  128. "Id": 1001,
  129. "MsgType": "HeartBeatReq",
  130. "AccessToken": c.AccessToken,
  131. "Content": map[string]string{
  132. "token": c.AccessToken,
  133. },
  134. }
  135. //transportMessageJSON, err := json.Marshal(message)
  136. //if err != nil {
  137. // logx.Error(err)
  138. // continue
  139. //}
  140. //
  141. _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  142. //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  143. err := c.Conn.WriteJSON(message)
  144. if err != nil {
  145. logx.Error(err)
  146. continue
  147. }
  148. logx.Info("发送心跳保活~~")
  149. }
  150. }
  151. }
  152. func (c *WechatWsClient) RegisterMessageHandler(handler MessageHandler) {
  153. if c == nil {
  154. logx.Error("WechatWsClient is nil in RegisterMessageHandler")
  155. return
  156. }
  157. logx.Info("Registering message handler for WechatWsClient:", c.Name)
  158. c.MessageHandler = append(c.MessageHandler, handler)
  159. }
  160. func (c *WechatWsClient) ReadPump() {
  161. defer func() {
  162. err := c.Conn.Close()
  163. if err != nil {
  164. return
  165. }
  166. }()
  167. //c.Conn.SetReadLimit(maxMessageSize)
  168. //err := c.Conn.SetReadDeadline(time.Time{})
  169. //if err != nil {
  170. // logx.Errorf("SetReadDeadline error: %v", err)
  171. // return
  172. //}
  173. //c.conn.SetPongHandler(func(string) error {
  174. // err := c.conn.SetReadDeadline(time.Time{})
  175. // if err != nil {
  176. // return err
  177. // }
  178. // return nil
  179. //})
  180. for {
  181. _, message, err := c.Conn.ReadMessage()
  182. if err != nil {
  183. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  184. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  185. } else {
  186. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  187. }
  188. break
  189. }
  190. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  191. logx.Debugf("message 获取的消息原型是 : %s", string(message))
  192. var msg MsgJsonObject
  193. //err = c.Conn.ReadJSON(msg)
  194. err = json.Unmarshal(message, &msg)
  195. if err != nil {
  196. logx.Error(err)
  197. continue
  198. }
  199. //logx.Info("message 获取的消息原型是 : ", msg)
  200. switch msg.MsgType {
  201. case "MsgReceivedAck":
  202. logx.Info("心跳确认消息已收到,无需处理")
  203. default:
  204. if len(c.MessageHandler) > 0 {
  205. for _, handler := range c.MessageHandler {
  206. // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理
  207. go func() {
  208. err = handler(&msg)
  209. if err != nil {
  210. logx.Error(err)
  211. }
  212. }()
  213. }
  214. }
  215. }
  216. }
  217. }
  218. // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
  219. //
  220. //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
  221. func (c *WechatWsClient) DeviceAuth2() error {
  222. sendMsg := &workphone.DeviceAuthReqMessage{
  223. AuthType: workphone.DeviceAuthReqMessage_InternalCode,
  224. Credential: "",
  225. }
  226. content, err := anypb.New(sendMsg)
  227. if err != nil {
  228. logx.Error(err)
  229. return err
  230. }
  231. logx.Info("content is ", content)
  232. transportMessage := &workphone.TransportMessage{
  233. Id: int64(workphone.EnumMsgType_DeviceAuthReq),
  234. MsgType: workphone.EnumMsgType_DeviceAuthReq,
  235. Content: content,
  236. }
  237. transportMessageJSON, err := protojson.MarshalOptions{
  238. UseProtoNames: true,
  239. }.Marshal(transportMessage)
  240. if err != nil {
  241. logx.Error(err)
  242. return err
  243. }
  244. logx.Info(string(transportMessageJSON))
  245. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  246. if err != nil {
  247. logx.Error(err)
  248. return err
  249. }
  250. return nil
  251. }
  252. // DeviceAuth 连接认证,使用IP白名单方式进行认证
  253. func (c *WechatWsClient) DeviceAuth() error {
  254. message := map[string]interface{}{
  255. "Id": 1010,
  256. "AccessToken": "",
  257. "MsgType": "DeviceAuthReq",
  258. "Content": map[string]interface{}{
  259. "AuthType": 3,
  260. "Credential": "",
  261. },
  262. }
  263. transportMessageJSON, err := json.Marshal(message)
  264. if err != nil {
  265. logx.Error(err)
  266. return err
  267. }
  268. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  269. if err != nil {
  270. logx.Error(err)
  271. return err
  272. }
  273. for {
  274. _, msgByte, err := c.Conn.ReadMessage()
  275. if err != nil {
  276. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  277. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  278. } else {
  279. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  280. }
  281. break
  282. }
  283. msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
  284. logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
  285. var msg MsgJsonObject
  286. err = json.Unmarshal(msgByte, &msg)
  287. if err != nil {
  288. logx.Error(err)
  289. continue
  290. }
  291. if msg.MsgType == "Error" {
  292. var errMsg workphone.ErrorMessage
  293. _ = protojson.Unmarshal([]byte(msg.Message), &errMsg)
  294. logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg)
  295. return errors.New("连接认证失败,请检查IP白名单是否正确")
  296. }
  297. if msg.MsgType != "DeviceAuthRsp" {
  298. logx.Error("不是连接认证消息,丢弃~", string(msgByte))
  299. continue
  300. }
  301. var deviceAuthRsp workphone.DeviceAuthRspMessage
  302. err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
  303. if err != nil {
  304. logx.Error(err)
  305. continue
  306. }
  307. logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
  308. c.AccessToken = deviceAuthRsp.AccessToken
  309. break
  310. }
  311. return nil
  312. }