wecom_ws_client.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. package wechat_ws
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "github.com/gorilla/websocket"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "google.golang.org/protobuf/encoding/protojson"
  9. "google.golang.org/protobuf/types/known/anypb"
  10. "time"
  11. "wechat-api/workphone/wecom"
  12. )
  13. type WecomWsClient struct {
  14. Conn *websocket.Conn
  15. AccessToken string
  16. Send chan []byte
  17. Name string
  18. CTypes string
  19. MessageHandler []MessageHandler
  20. }
  21. func NewWecomWsClient(urlStr string, name string, ctype string) (*WecomWsClient, error) {
  22. logx.Debug("实例开始")
  23. //p, _ := url.Parse("http://127.0.0.1:7897")
  24. var d = websocket.Dialer{
  25. //Proxy: http.ProxyURL(&url.URL{
  26. // Scheme: "http", // or "https" depending on your proxy
  27. // Host: "127.0.0.1:7897",
  28. // Path: "/",
  29. //}),
  30. }
  31. c, _, err := d.Dial(urlStr, nil)
  32. if err != nil {
  33. logx.Error(err)
  34. return nil, err
  35. }
  36. client := &WecomWsClient{
  37. Conn: c,
  38. AccessToken: "",
  39. Send: make(chan []byte, 256),
  40. Name: name,
  41. CTypes: ctype,
  42. MessageHandler: make([]MessageHandler, 0),
  43. }
  44. err = client.DeviceAuth()
  45. if err != nil {
  46. return nil, err
  47. }
  48. return client, nil
  49. }
  50. // SendMsg 立刻发送消息,发送不成功可以获取错误信息
  51. func (c *WecomWsClient) SendMsg(message []byte) error {
  52. logx.Info("发送消息:", string(message))
  53. err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  54. if err != nil {
  55. return err
  56. }
  57. err = c.Conn.WriteMessage(websocket.TextMessage, message)
  58. if err != nil {
  59. return err
  60. }
  61. return nil
  62. }
  63. // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误
  64. func (c *WecomWsClient) SendMsgByChan(msg []byte) {
  65. c.Send <- msg
  66. }
  67. func (c *WecomWsClient) WritePump() {
  68. ticker := time.NewTicker(pingPeriod)
  69. defer func() {
  70. ticker.Stop()
  71. err := c.Conn.Close()
  72. if err != nil {
  73. return
  74. }
  75. }()
  76. for {
  77. select {
  78. case message, ok := <-c.Send:
  79. if !ok {
  80. continue
  81. }
  82. _ = c.SendMsg(message)
  83. // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持
  84. //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
  85. //if !ok {
  86. // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
  87. // return
  88. //}
  89. //
  90. //w, err := c.conn.NextWriter(websocket.TextMessage)
  91. //if err != nil {
  92. // return
  93. //}
  94. //_, _ = w.Write(message)
  95. //
  96. //// Add queued chat messages to the current websocket message.
  97. //n := len(c.send)
  98. //for i := 0; i < n; i++ {
  99. // _, _ = w.Write(newline)
  100. // _, _ = w.Write(<-c.send)
  101. //}
  102. //
  103. //if err := w.Close(); err != nil {
  104. // return
  105. //}
  106. case <-ticker.C:
  107. if c.AccessToken == "" {
  108. logx.Error("accessToken is empty")
  109. continue
  110. }
  111. message := map[string]interface{}{
  112. "Id": 1001,
  113. "MsgType": "HeartBeatReq",
  114. "AccessToken": c.AccessToken,
  115. "Content": map[string]string{
  116. "token": c.AccessToken,
  117. },
  118. }
  119. //transportMessageJSON, err := json.Marshal(message)
  120. //if err != nil {
  121. // logx.Error(err)
  122. // continue
  123. //}
  124. //
  125. _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
  126. //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  127. err := c.Conn.WriteJSON(message)
  128. if err != nil {
  129. logx.Error(err)
  130. continue
  131. }
  132. logx.Info("发送心跳保活~~")
  133. }
  134. }
  135. }
  136. func (c *WecomWsClient) RegisterMessageHandler(handler MessageHandler) {
  137. if c == nil {
  138. logx.Error("WecomWsClient is nil in RegisterMessageHandler")
  139. return
  140. }
  141. logx.Info("Registering message handler for WecomWsClient:", c.Name)
  142. c.MessageHandler = append(c.MessageHandler, handler)
  143. }
  144. func (c *WecomWsClient) ReadPump() {
  145. defer func() {
  146. err := c.Conn.Close()
  147. if err != nil {
  148. return
  149. }
  150. }()
  151. //c.Conn.SetReadLimit(maxMessageSize)
  152. //err := c.Conn.SetReadDeadline(time.Time{})
  153. //if err != nil {
  154. // logx.Errorf("SetReadDeadline error: %v", err)
  155. // return
  156. //}
  157. //c.conn.SetPongHandler(func(string) error {
  158. // err := c.conn.SetReadDeadline(time.Time{})
  159. // if err != nil {
  160. // return err
  161. // }
  162. // return nil
  163. //})
  164. for {
  165. _, message, err := c.Conn.ReadMessage()
  166. if err != nil {
  167. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  168. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  169. } else {
  170. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  171. }
  172. break
  173. }
  174. message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
  175. logx.Debugf("message 获取的消息原型是 : %s", string(message))
  176. var msg MsgJsonObject
  177. //err = c.Conn.ReadJSON(msg)
  178. err = json.Unmarshal(message, &msg)
  179. if err != nil {
  180. logx.Error(err)
  181. continue
  182. }
  183. //logx.Info("message 获取的消息原型是 : ", msg)
  184. switch msg.MsgType {
  185. case "MsgReceivedAck":
  186. logx.Info("心跳确认消息已收到,无需处理")
  187. default:
  188. if len(c.MessageHandler) > 0 {
  189. for _, handler := range c.MessageHandler {
  190. // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理
  191. go func() {
  192. defer func() {
  193. // 捕获panic, 防止程序崩溃
  194. if r := recover(); r != nil {
  195. logx.Error("Recovered in f", r)
  196. }
  197. }()
  198. err = handler(&msg)
  199. if err != nil {
  200. logx.Error(err)
  201. }
  202. }()
  203. }
  204. }
  205. }
  206. }
  207. }
  208. // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅
  209. //
  210. //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。
  211. func (c *WecomWsClient) DeviceAuth2() error {
  212. sendMsg := &wecom.DeviceAuthReqMessage{
  213. AuthType: wecom.DeviceAuthReqMessage_InternalCode,
  214. Credential: "",
  215. }
  216. content, err := anypb.New(sendMsg)
  217. if err != nil {
  218. logx.Error(err)
  219. return err
  220. }
  221. logx.Info("content is ", content)
  222. transportMessage := &wecom.TransportMessage{
  223. Id: int64(wecom.EnumMsgType_DeviceAuthReq),
  224. MsgType: wecom.EnumMsgType_DeviceAuthReq,
  225. Content: content,
  226. }
  227. transportMessageJSON, err := protojson.MarshalOptions{
  228. UseProtoNames: true,
  229. }.Marshal(transportMessage)
  230. if err != nil {
  231. logx.Error(err)
  232. return err
  233. }
  234. logx.Info(string(transportMessageJSON))
  235. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  236. if err != nil {
  237. logx.Error(err)
  238. return err
  239. }
  240. return nil
  241. }
  242. // DeviceAuth 连接认证,使用IP白名单方式进行认证
  243. func (c *WecomWsClient) DeviceAuth() error {
  244. message := map[string]interface{}{
  245. "Id": 1010,
  246. "AccessToken": "",
  247. "MsgType": "DeviceAuthReq",
  248. "Content": map[string]interface{}{
  249. "AuthType": 3,
  250. "Credential": "",
  251. },
  252. }
  253. transportMessageJSON, err := json.Marshal(message)
  254. if err != nil {
  255. logx.Error(err)
  256. return err
  257. }
  258. err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON)
  259. if err != nil {
  260. logx.Error(err)
  261. return err
  262. }
  263. for {
  264. _, msgByte, err := c.Conn.ReadMessage()
  265. if err != nil {
  266. if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
  267. logx.Errorf("IsUnexpectedCloseError error: %v", err)
  268. } else {
  269. logx.Errorf("NotIsUnexpectedCloseError error: %v", err)
  270. }
  271. break
  272. }
  273. msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1))
  274. logx.Debugf("message 获取的消息原型是 : %s", string(msgByte))
  275. var msg MsgJsonObject
  276. err = json.Unmarshal(msgByte, &msg)
  277. if err != nil {
  278. logx.Error(err)
  279. continue
  280. }
  281. if msg.MsgType == "Error" {
  282. var errMsg wecom.ErrorMessage
  283. _ = protojson.Unmarshal([]byte(msg.Message), &errMsg)
  284. logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg)
  285. return errors.New("连接认证失败,请检查IP白名单是否正确")
  286. }
  287. if msg.MsgType != "DeviceAuthRsp" {
  288. logx.Error("不是连接认证消息,丢弃~", string(msgByte))
  289. continue
  290. }
  291. var deviceAuthRsp wecom.DeviceAuthRspMessage
  292. err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp)
  293. if err != nil {
  294. logx.Error(err)
  295. continue
  296. }
  297. logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken)
  298. c.AccessToken = deviceAuthRsp.AccessToken
  299. break
  300. }
  301. return nil
  302. }