package wechat_ws import ( "bytes" "encoding/json" "errors" "github.com/gorilla/websocket" "github.com/zeromicro/go-zero/core/logx" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/anypb" "time" "wechat-api/workphone" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 4) / 10 // Maximum message size allowed from peer. maxMessageSize = 4096 ) var ( newline = []byte{'\n'} space = []byte{' '} ) type WechatWsClient struct { Conn *websocket.Conn AccessToken string Send chan []byte Name string CTypes string MessageHandler []MessageHandler } func NewWechatWsClient(urlStr string, name string, ctype string) (*WechatWsClient, error) { logx.Debug("实例开始") //p, _ := url.Parse("http://127.0.0.1:7897") var d = websocket.Dialer{ //Proxy: http.ProxyURL(&url.URL{ // Scheme: "http", // or "https" depending on your proxy // Host: "127.0.0.1:7897", // Path: "/", //}), } c, _, err := d.Dial(urlStr, nil) if err != nil { logx.Error(err) return nil, err } client := &WechatWsClient{ Conn: c, AccessToken: "", Send: make(chan []byte, 256), Name: name, CTypes: ctype, MessageHandler: make([]MessageHandler, 0), } err = client.DeviceAuth() if err != nil { return nil, err } return client, nil } // SendMsg 立刻发送消息,发送不成功可以获取错误信息 func (c *WechatWsClient) SendMsg(message []byte) error { logx.Info("发送消息:", string(message)) err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { return err } err = c.Conn.WriteMessage(websocket.TextMessage, message) if err != nil { return err } return nil } // SendMsgByChan 通过chan通道发送消息,这样可以保证发送顺序,异步,不返回错误 func (c *WechatWsClient) SendMsgByChan(msg []byte) { c.Send <- msg } func (c *WechatWsClient) WritePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() err := c.Conn.Close() if err != nil { return } }() for { select { case message, ok := <-c.Send: if !ok { continue } _ = c.SendMsg(message) // 以下注释 的,是把所有当前积压的消息一次性都发出去的写法,不确定是否会乱序,或服务端否支持 //_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait)) //if !ok { // _ = c.conn.WriteMessage(websocket.CloseMessage, []byte{}) // return //} // //w, err := c.conn.NextWriter(websocket.TextMessage) //if err != nil { // return //} //_, _ = w.Write(message) // //// Add queued chat messages to the current websocket message. //n := len(c.send) //for i := 0; i < n; i++ { // _, _ = w.Write(newline) // _, _ = w.Write(<-c.send) //} // //if err := w.Close(); err != nil { // return //} case <-ticker.C: if c.AccessToken == "" { logx.Error("accessToken is empty") continue } message := map[string]interface{}{ "Id": 1001, "MsgType": "HeartBeatReq", "AccessToken": c.AccessToken, "Content": map[string]string{ "token": c.AccessToken, }, } //transportMessageJSON, err := json.Marshal(message) //if err != nil { // logx.Error(err) // continue //} // _ = c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) //err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) err := c.Conn.WriteJSON(message) if err != nil { logx.Error(err) continue } logx.Info("发送心跳保活~~") } } } func (c *WechatWsClient) RegisterMessageHandler(handler MessageHandler) { if c == nil { logx.Error("WechatWsClient is nil in RegisterMessageHandler") return } logx.Info("Registering message handler for WechatWsClient:", c.Name) c.MessageHandler = append(c.MessageHandler, handler) } func (c *WechatWsClient) ReadPump() { defer func() { err := c.Conn.Close() if err != nil { return } }() //c.Conn.SetReadLimit(maxMessageSize) //err := c.Conn.SetReadDeadline(time.Time{}) //if err != nil { // logx.Errorf("SetReadDeadline error: %v", err) // return //} //c.conn.SetPongHandler(func(string) error { // err := c.conn.SetReadDeadline(time.Time{}) // if err != nil { // return err // } // return nil //}) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logx.Errorf("IsUnexpectedCloseError error: %v", err) } else { logx.Errorf("NotIsUnexpectedCloseError error: %v", err) } break } message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) logx.Debugf("message 获取的消息原型是 : %s", string(message)) var msg MsgJsonObject //err = c.Conn.ReadJSON(msg) err = json.Unmarshal(message, &msg) if err != nil { logx.Error(err) continue } //logx.Info("message 获取的消息原型是 : ", msg) switch msg.MsgType { case "MsgReceivedAck": logx.Info("心跳确认消息已收到,无需处理") default: if len(c.MessageHandler) > 0 { for _, handler := range c.MessageHandler { // todo 重启时这里会丢消息,回头需要用wg来进行平滑处理 go func() { defer func() { // 捕获panic, 防止程序崩溃 if r := recover(); r != nil { logx.Error("Recovered in f", r) } }() err = handler(&msg) if err != nil { logx.Error(err) } }() } } } } } // DeviceAuth2 todo 这个方法中,产生了一个@type的字段,无法通过注册.有空的时候看看,这种方式还是更优雅 // //go:deprecated 已弃用,请使用 DeviceAuth 方法进行设备认证。 func (c *WechatWsClient) DeviceAuth2() error { sendMsg := &workphone.DeviceAuthReqMessage{ AuthType: workphone.DeviceAuthReqMessage_InternalCode, Credential: "", } content, err := anypb.New(sendMsg) if err != nil { logx.Error(err) return err } logx.Info("content is ", content) transportMessage := &workphone.TransportMessage{ Id: int64(workphone.EnumMsgType_DeviceAuthReq), MsgType: workphone.EnumMsgType_DeviceAuthReq, Content: content, } transportMessageJSON, err := protojson.MarshalOptions{ UseProtoNames: true, }.Marshal(transportMessage) if err != nil { logx.Error(err) return err } logx.Info(string(transportMessageJSON)) err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) if err != nil { logx.Error(err) return err } return nil } // DeviceAuth 连接认证,使用IP白名单方式进行认证 func (c *WechatWsClient) DeviceAuth() error { message := map[string]interface{}{ "Id": 1010, "AccessToken": "", "MsgType": "DeviceAuthReq", "Content": map[string]interface{}{ "AuthType": 3, "Credential": "", }, } transportMessageJSON, err := json.Marshal(message) if err != nil { logx.Error(err) return err } err = c.Conn.WriteMessage(websocket.TextMessage, transportMessageJSON) if err != nil { logx.Error(err) return err } for { _, msgByte, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { logx.Errorf("IsUnexpectedCloseError error: %v", err) } else { logx.Errorf("NotIsUnexpectedCloseError error: %v", err) } break } msgByte = bytes.TrimSpace(bytes.Replace(msgByte, newline, space, -1)) logx.Debugf("message 获取的消息原型是 : %s", string(msgByte)) var msg MsgJsonObject err = json.Unmarshal(msgByte, &msg) if err != nil { logx.Error(err) continue } if msg.MsgType == "Error" { var errMsg workphone.ErrorMessage _ = protojson.Unmarshal([]byte(msg.Message), &errMsg) logx.Error("连接认证失败,请检查IP白名单是否正确", errMsg) return errors.New("连接认证失败,请检查IP白名单是否正确") } if msg.MsgType != "DeviceAuthRsp" { logx.Error("不是连接认证消息,丢弃~", string(msgByte)) continue } var deviceAuthRsp workphone.DeviceAuthRspMessage err = protojson.Unmarshal([]byte(msg.Message), &deviceAuthRsp) if err != nil { logx.Error(err) continue } logx.Info("连接认证成功 accessToken :", deviceAuthRsp.AccessToken) c.AccessToken = deviceAuthRsp.AccessToken break } return nil }