wechat_ws_client.go 8.5 KB


  1. package wechat_ws
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "github.com/gorilla/websocket"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "google.golang.org/protobuf/encoding/protojson"
  9. "google.golang.org/protobuf/types/known/anypb"
  10. "net/http"
  11. "net/url"
  12. "time"
  13. "wechat-api/workphone"
  14. )
  15. const (
  16. // Time allowed to write a message to the peer.
  17. writeWait = 10 * time.Second
  18. // Time allowed to read the next pong message from the peer.
  19. pongWait = 30 * time.Second
  20. // Send pings to peer with this period. Must be less than pongWait.
  21. pingPeriod = (pongWait * 4) / 10
  22. // Maximum message size allowed from peer.
  23. maxMessageSize = 4096
  24. )
  25. var (
  26. newline = []byte{'\n'}
  27. space = []byte{' '}
  28. )
  29. type WechatWsClient struct {
  30. Conn *websocket.Conn
  31. AccessToken string
  32. Send chan []byte
  33. Name string
  34. CTypes string
  35. MessageHandler []MessageHandler
  36. }
  37. func NewWechatWsClient(urlStr string, name string, ctype string) (*WechatWsClient, error) {
  38. logx.Debug("实例开始")
  39. //p, _ := url.Parse("http://127.0.0.1:7897")
  40. var d = websocket.Dialer{
  41. Proxy: http.ProxyURL(&url.URL{
  42. Scheme: "http", // or "https" depending on your proxy
  43. Host: "127.0.0.1:7897",
  44. Path: "/",
  45. }),
  46. }
  47. c, _, err := d.Dial(urlStr, nil)
  48. if err != nil {
  49. logx.Error(err)
  50. return nil, err
  51. }
  52. client := &WechatWsClient{
  53. Conn: c,
  54. AccessToken: "",
  55. Send: make(chan []byte, 256),
  56. Name: name,
  57. CTypes: ctype,
  58. MessageHandler: make([]MessageHandler, 0),
  59. }
  60. err = client.DeviceAuth()
  61. if err != nil {
  62. return nil, err
  63. }
  64. return client, nil
  65. }
  66. // SendMsg 立刻发送消息,发送不成功可以获取错误信息
  67. func (c *WechatWsClient) SendMsg(message []byte) error {
  68. logx.Info("发送消息:", string(message))
  69. err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  70. if err != nil {
  71. return err
  72. }
  73. err = c.Conn.WriteMessage(websocket.TextMessage, message)
  74. if err != nil {
  75. return err
  76. }
  77. return nil
  78. }
  79. // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
  80. func (c *WechatWsClient) SendMsgByChan(msg []byte) {
  81. c.Send <- msg
  82. }
  83. func (c *WechatWsClient) WritePump() {
  84. ticker := time.NewTicker(pingPeriod)
  85. defer func() {
  86. ticker.Stop()
  87. err := c.Conn.Close()
  88. if err != nil {
  89. return
  90. }
  91. }()
  92. for {
  93. select {
  94. case message, ok := <-c.Send:
  95. if !ok {
  96. continue
  97. }
  98. _ = c.SendMsg(message)
  99. // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
  100. //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  101. //if !ok {
  102. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  103. // return
  104. //}
  105. //
  106. //w, err := c.conn.NextWriter(websocket.TextMessage)
  107. //if err != nil {
  108. // return
  109. //}
  110. //_, _ = w.Write(message)
  111. //
  112. //// Add queued chat messages to the current websocket message.
  113. //n := len(c.send)
  114. //for i := 0; i < n; i++ {
  115. // _, _ = w.Write(newline)
  116. // _, _ = w.Write(<-c.send)
  117. //}
  118. //
  119. //if err := w.Close(); err != nil {
  120. // return
  121. //}
  122. case <-ticker.C:
  123. if c.AccessToken == "" {
  124. logx.Error("accessToken is empty")
  125. continue
  126. }
  127. message := map[string]interface{}{
  128. "Id": 1001,
  129. "MsgType": "HeartBeatReq",
  130. "AccessToken": c.AccessToken,
  131. "Content": map[string]string{
  132. "token": c.AccessToken,
  133. },
  134. }
  135. //transportMessageJSON, err := json.Marshal(message)
  136. //if err != nil {
  137. // logx.Error(err)
  138. // continue
  139. //}
  140. //
  141. _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  142. //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  143. err := c.Conn.WriteJSON(message)
  144. if err != nil {
  145. logx.Error(err)
  146. continue
  147. }
  148. logx.Info("发送心跳保活~~")
  149. }
  150. }
  151. }
  152. func (c *WechatWsClient) RegisterMessageHandler(handler MessageHandler) {
  153. if c == nil {
  154. logx.Error("WechatWsClient is nil in RegisterMessageHandler")
  155. return
  156. }
  157. logx.Info("Registering message handler for WechatWsClient:", c.Name)
  158. c.MessageHandler = append(c.MessageHandler, handler)
  159. }
  160. func (c *WechatWsClient) ReadPump() {
  161. defer func() {
  162. err := c.Conn.Close()
  163. if err != nil {
  164. return
  165. }
  166. }()
  167. //c.Conn.SetReadLimit(maxMessageSize)
  168. //err := c.Conn.SetReadDeadline(time.Time{})
  169. //if err != nil {
  170. // logx.Errorf("SetReadDeadline error: %v", err)
  171. // return
  172. //}
  173. //c.conn.SetPongHandler(func(string) error {
  174. // err := c.conn.SetReadDeadline(time.Time{})
  175. // if err != nil {
  176. // return err
  177. // }
  178. // return nil
  179. //})
  180. for {
  181. _, message, err := c.Conn.ReadMessage()
  182. if err != nil {
  183. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  184. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  185. } else {
  186. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  187. }
  188. break
  189. }
  190. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  191. logx.Debugf("message 获取的消息原型是 : %s", string(message))
  192. var msg MsgJsonObject
  193. //err = c.Conn.ReadJSON(msg)
  194. err = json.Unmarshal(message, &msg)
  195. if err != nil {
  196. logx.Error(err)
  197. continue
  198. }
  199. //logx.Info("message 获取的消息原型是 : ", msg)
  200. switch msg.MsgType {
  201. case "MsgReceivedAck":
  202. logx.Info("心跳确认消息已收到,无需处理")
  203. default:
  204. if len(c.MessageHandler) > 0 {
  205. for _, handler := range c.MessageHandler {
  206. // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理
  207. go func() {
  208. defer func() {
  209. // 捕获panic, 防止程序崩溃
  210. if r := recover(); r != nil {
  211. logx.Error("Recovered in f", r)
  212. }
  213. }()
  214. err = handler(&msg)
  215. if err != nil {
  216. logx.Error(err)
  217. }
  218. }()
  219. }
  220. }
  221. }
  222. }
  223. }
  224. // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
  225. //
  226. //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
  227. func (c *WechatWsClient) DeviceAuth2() error {
  228. sendMsg := &workphone.DeviceAuthReqMessage{
  229. AuthType: workphone.DeviceAuthReqMessage_InternalCode,
  230. Credential: "",
  231. }
  232. content, err := anypb.New(sendMsg)
  233. if err != nil {
  234. logx.Error(err)
  235. return err
  236. }
  237. logx.Info("content is ", content)
  238. transportMessage := &workphone.TransportMessage{
  239. Id: int64(workphone.EnumMsgType_DeviceAuthReq),
  240. MsgType: workphone.EnumMsgType_DeviceAuthReq,
  241. Content: content,
  242. }
  243. transportMessageJSON, err := protojson.MarshalOptions{
  244. UseProtoNames: true,
  245. }.Marshal(transportMessage)
  246. if err != nil {
  247. logx.Error(err)
  248. return err
  249. }
  250. logx.Info(string(transportMessageJSON))
  251. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  252. if err != nil {
  253. logx.Error(err)
  254. return err
  255. }
  256. return nil
  257. }
  258. // DeviceAuth 连接认证,使用IP白名单方式进行认证
  259. func (c *WechatWsClient) DeviceAuth() error {
  260. message := map[string]interface{}{
  261. "Id": 1010,
  262. "AccessToken": "",
  263. "MsgType": "DeviceAuthReq",
  264. "Content": map[string]interface{}{
  265. "AuthType": 3,
  266. "Credential": "",
  267. },
  268. }
  269. transportMessageJSON, err := json.Marshal(message)
  270. if err != nil {
  271. logx.Error(err)
  272. return err
  273. }
  274. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  275. if err != nil {
  276. logx.Error(err)
  277. return err
  278. }
  279. for {
  280. _, msgByte, err := c.Conn.ReadMessage()
  281. if err != nil {
  282. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  283. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  284. } else {
  285. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  286. }
  287. break
  288. }
  289. msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
  290. logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
  291. var msg MsgJsonObject
  292. err = json.Unmarshal(msgByte, &msg)
  293. if err != nil {
  294. logx.Error(err)
  295. continue
  296. }
  297. if msg.MsgType == "Error" {
  298. var errMsg workphone.ErrorMessage
  299. _ = protojson.Unmarshal([]byte(msg.Message), &errMsg)
  300. logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg)
  301. return errors.New("连接认证失败,请检查IP白名单是否正确")
  302. }
  303. if msg.MsgType != "DeviceAuthRsp" {
  304. logx.Error("不是连接认证消息,丢弃~", string(msgByte))
  305. continue
  306. }
  307. var deviceAuthRsp workphone.DeviceAuthRspMessage
  308. err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
  309. if err != nil {
  310. logx.Error(err)
  311. continue
  312. }
  313. logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
  314. c.AccessToken = deviceAuthRsp.AccessToken
  315. break
  316. }
  317. return nil
  318. }