import base64 import json import os import socket import sys sys.path.append(os.getcwd()+'/workphone') import threading import time import uuid from typing import Optional from wcferry import WxMsg from service.robot import Robot from common.log import logger from workphone.DeviceAuthReq_pb2 import DeviceAuthReqMessage from workphone.DeviceAuthRsp_pb2 import DeviceAuthRspMessage from workphone.ErrorMessage_pb2 import ErrorMessage from workphone.TransportMessage_pb2 import TransportMessage, EnumMsgType, EnumContentType, EnumGender from workphone.WeChatOnlineNotice_pb2 import WeChatOnlineNoticeMessage from workphone.FriendTalkNotice_pb2 import FriendTalkNoticeMessage from workphone.WeChatTalkToFriendNotice_pb2 import WeChatTalkToFriendNoticeMessage from workphone.TalkToFriendTask_pb2 import TalkToFriendTaskMessage from workphone.TalkToFriendTaskResultNotice_pb2 import TalkToFriendTaskResultNoticeMessage from workphone.FriendAddNotice_pb2 import FriendMessage from workphone.FriendPushNotice_pb2 import FriendPushNoticeMessage from workphone.TriggerFriendPushTask_pb2 import TriggerFriendPushTaskMessage from workphone.ChatRoomPushNotice_pb2 import ChatRoomPushNoticeMessage, ChatRoomMessage from workphone.ConversationPushNotice_pb2 import ConversationPushNoticeMessage, ConversMessage from workphone.HistoryMsgPushNotice_pb2 import HistoryMsgPushNoticeMessage, ChatMessage from workphone.TriggerChatRoomPushTask_pb2 import TriggerChatRoomPushTaskMessage from workphone.TriggerConversationPushTask_pb2 import TriggerConversationPushTaskMessage from workphone.TriggerHistoryMsgPushTask_pb2 import TriggerHistoryMsgPushTaskMessage class SocketClient: accessToken = None client_socket = None thread_register = None thread_onmessage = None status = False def __init__(self,robot:Optional[Robot]): if robot: self.robot = robot logger.info("[socket] 获取用户信息: %s", self.robot.user) self.start() def stop(self): self.status = False self.thread_register.join() self.thread_onmessage.join() self.client_socket.close() logger.info("[socket] socket client closed") self.accessToken = None self.client_socket = None self.thread_register = None self.thread_onmessage = None def start(self): self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client_socket.connect(("chat.gkscrm.com", 13087)) # keep_alive 设置, 有心跳包,这个应该用不到,服务端也没按这个保活 # self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) # 空闲60秒后启动保活检测 # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) # 每隔30秒发送一次保活包 # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) logger.info("[socket] socket client connected") self.status = True # 开新线程 注册设备,注册完成后,发起上线通知,然后定时发送心跳包 self.thread_register = threading.Thread(target=self.register, daemon=True) self.thread_register.start() # 监听消息 self.thread_onmessage = threading.Thread(target=self.listen_for_messages, daemon=True) self.thread_onmessage.start() def listen_for_messages(self): while True and self.status: # 接收实际数据 response_length_data = self.client_socket.recv(4) response_length = int.from_bytes(response_length_data, byteorder='big') response_data = self.client_socket.recv(response_length) logger.info("[socket] 接收到的原始数据: %s", response_data) # 接收数据解包 message = TransportMessage() message.ParseFromString(response_data) logger.info("[socket] 接收到的 TransportMessage: %s", message) if message.MsgType != EnumMsgType.MsgReceivedAck and message.MsgType != EnumMsgType.DeviceAuthRsp and self.accessToken: # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.MsgReceivedAck transport_message.AccessToken = self.accessToken transport_message.RefMessageId = message.Id self.send(transport_message) if message.MsgType == EnumMsgType.MsgReceivedAck: if message.AccessToken != self.accessToken: logger.error("[socket] 设备身份验证失败: accessToken 不匹配") continue else: logger.info("[socket] receive ack") continue elif message.MsgType == EnumMsgType.DeviceAuthRsp: device_auth_rsp = DeviceAuthRspMessage() message.Content.Unpack(device_auth_rsp) logger.info("[socket] 设备认证返回: %s", device_auth_rsp) self.accessToken = device_auth_rsp.AccessToken logger.info("[socket] 设备认证注册成功") elif message.MsgType == EnumMsgType.TriggerHistoryMsgPushTask: # 获取对话历史记录 task = TriggerHistoryMsgPushTaskMessage() message.Content.Unpack(task) self.trigger_history_msg_push_task_handle(task) elif message.MsgType == EnumMsgType.TriggerConversationPushTask: # 刷新会话列表 task = TriggerConversationPushTaskMessage() message.Content.Unpack(task) self.trigger_conversation_push_task_handle(task) elif message.MsgType == EnumMsgType.TriggerFriendPushTask: # 刷新联系人 trigger_friend_push_task = TriggerFriendPushTaskMessage() message.Content.Unpack(trigger_friend_push_task) self.trigger_friend_push_task_handle(trigger_friend_push_task) elif message.MsgType == EnumMsgType.TriggerChatroomPushTask: # 刷新群组 trigger_chatroom_push_task = TriggerChatRoomPushTaskMessage() message.Content.Unpack(trigger_chatroom_push_task) self.trigger_chatroom_push_task_handle(trigger_chatroom_push_task) elif message.MsgType == EnumMsgType.TalkToFriendTask: talk_to_friend_task = TalkToFriendTaskMessage() message.Content.Unpack(talk_to_friend_task) if talk_to_friend_task.Remark: sender = self.robot.contacts.get(talk_to_friend_task.Remark, {}) content = "@" + sender.get("name") + " " + talk_to_friend_task.Content.decode("utf-8") talk_to_friend_task.Content = content.encode("utf-8") logger.info("[socket] 需要发送的消息如下: %s", talk_to_friend_task) send_status = -1 if talk_to_friend_task.ContentType == EnumContentType.Text: logger.info("[socket] 发送文本消息") send_status = self.robot.wcf.send_text(talk_to_friend_task.Content.decode("utf-8"),talk_to_friend_task.FriendId) elif talk_to_friend_task.ContentType == EnumContentType.Picture: logger.info("[socket] 发送图片消息") send_status = self.robot.wcf.send_image(talk_to_friend_task.Content.decode("utf-8"), talk_to_friend_task.FriendId) elif talk_to_friend_task.ContentType == EnumContentType.File: logger.info("[socket] 发送文件消息") file_content = json.loads(talk_to_friend_task.Content.decode("utf-8")) if "url" in file_content: # wcferry 发送文件接口有错,使用发送图片接口代替 send_status = self.robot.wcf.send_image(file_content.get("url",""),talk_to_friend_task.FriendId) else: logger.error("[socket] 发送文件失败,请检查文件地址是否正确") continue if send_status == 0: logger.info("[socket] 发送图片成功") msg_id = int(uuid.uuid4()) >> (128-19) notice = TalkToFriendTaskResultNoticeMessage() notice.Success = True notice.WeChatId = talk_to_friend_task.WeChatId notice.FriendId = talk_to_friend_task.FriendId notice.MsgId = talk_to_friend_task.MsgId notice.MsgSvrId = msg_id notice.CreateTime = int(time.time() * 1000) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.TalkToFriendTaskResultNotice transport_message.Content.Pack(notice) transport_message.AccessToken = self.accessToken self.send(transport_message) wechat_talk_to_friend_notice_message = WeChatTalkToFriendNoticeMessage() wechat_talk_to_friend_notice_message.WeChatId = talk_to_friend_task.WeChatId wechat_talk_to_friend_notice_message.FriendId = talk_to_friend_task.FriendId wechat_talk_to_friend_notice_message.ContentType = EnumContentType.Text wechat_talk_to_friend_notice_message.Content = talk_to_friend_task.Content wechat_talk_to_friend_notice_message.msgSvrId = msg_id wechat_talk_to_friend_notice_message.CreateTime = int(time.time() * 1000) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice transport_message.Content.Pack(wechat_talk_to_friend_notice_message) transport_message.AccessToken = self.accessToken elif message.MsgType == EnumMsgType.Error: err = ErrorMessage() message.Content.Unpack(err) logger.error("[socket] 错误信息: %s", err) def register(self): mac = uuid.getnode() while self.accessToken is None and self.status: # 构造设备认证消息 device_auth = DeviceAuthReqMessage() device_auth.AuthType = DeviceAuthReqMessage.EnumAuthType.DeviceCode device_auth.Credential = "PC-" + str(mac) logger.info("[socket] 设备认证数据: %s", device_auth) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.DeviceAuthReq transport_message.Content.Pack(device_auth) self.send(transport_message) time.sleep(5) if self.status: # 开启消息接收处理 self.robot.enableReceivingMsgCallback(self.on_msg) # 上线通知 online = WeChatOnlineNoticeMessage() # logger.info(f"\n----------------self.robot.user: {self.robot.user}", exc_info=True) online.WeChatId = self.robot.user.get('wxid') online.WeChatNo = self.robot.user.get('code', "") online.WeChatNick = self.robot.user.get('name', "") if self.robot.user.get('gender') == "男": online.Gender = EnumGender.Male elif self.robot.user.get('gender') == "女": online.Gender = EnumGender.Female else: online.Gender = EnumGender.UnknownGender online.Country = self.robot.user.get('country', "") online.Province = self.robot.user.get('province', "") online.City = self.robot.user.get('city', "") online.Avatar = self.robot.user.get('avatar','') online.IMEI = "PC-" + str(mac) online.Phone = self.robot.user.get('mobile', "") # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.WeChatOnlineNotice transport_message.Content.Pack(online) transport_message.AccessToken = self.accessToken self.send(transport_message) # 发送心跳包 while True and self.status: transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.HeartBeatReq transport_message.AccessToken = self.accessToken self.send(transport_message) time.sleep(30) def on_msg(self,msg:WxMsg): # msg 转 json logger.info("收到消息: %s", json.dumps(msg.__dict__,ensure_ascii=False, indent=4)) notice = None transport_message = None if msg.from_self(): if msg.is_text(): notice = WeChatTalkToFriendNoticeMessage() notice.WeChatId = msg.sender notice.FriendId = msg.roomid if msg.is_text(): notice.ContentType = EnumContentType.Text notice.Content = str(msg.content).encode('utf-8') notice.msgSvrId = msg.id notice.CreateTime = msg.ts logger.info("准备发送的消息体: %s", notice) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice transport_message.Content.Pack(notice) transport_message.AccessToken = self.accessToken else: if msg.is_text(): logger.info('获取好友信息 start') sender = self.robot.contacts.get(msg.sender, {}) logger.info('获取好友信息 end: %s',sender) logger.info("好友信息: %s", json.dumps(sender,ensure_ascii=False, indent=4)) notice = FriendTalkNoticeMessage() notice.WeChatId = self.robot.user.get('wxid') notice.ContentType = EnumContentType.Text notice.msgSvrId = msg.id notice.CreateTime = msg.ts if msg.from_group(): notice.FriendId = msg.roomid notice.Content = (msg.sender + ":\n" + str(msg.content)).encode('utf-8') else: notice.FriendId = msg.sender notice.Content = str(msg.content).encode('utf-8') notice.NickName = sender.get('name') logger.info("准备发送的消息体: %s", notice) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.FriendTalkNotice transport_message.Content.Pack(notice) transport_message.AccessToken = self.accessToken if transport_message: logger.info("[socket] 发送消息: %s", transport_message) self.send(transport_message) def trigger_history_msg_push_task_handle(self,task:TriggerHistoryMsgPushTaskMessage): logger.info("收到历史消息推送任务: %s", task) notice = HistoryMsgPushNoticeMessage() notice.WeChatId = task.WeChatId notice.TaskId = task.TaskId messages_list = self.robot.wcf.query_sql("MSG2.db", "select * from MSG where StrTalker='"+task.FriendId+"' order by localId DESC limit " + str(task.Count)) for msg in messages_list: logger.info("消息: %s", msg) bytes_Extra = self.robot.get_BytesExtra(msg.get('BytesExtra')) chat_message = ChatMessage() chat_message.FriendId = task.FriendId if msg.get('Type') == 1: chat_message.ContentType = EnumContentType.Text elif msg.get('Type') == 3: chat_message.ContentType = EnumContentType.Picture elif msg.get('Type') == 34: chat_message.ContentType = EnumContentType.Voice elif msg.get('Type') == 43: chat_message.ContentType = EnumContentType.Video elif msg.get('Type') == 48: chat_message.ContentType = EnumContentType.Location elif msg.get('Type') == 49: chat_message.ContentType = EnumContentType.File else: chat_message.ContentType = EnumContentType.UnSupport chat_message.Content = msg.get('StrContent').encode('utf-8') chat_message.MsgId = msg.get('localId') chat_message.MsgSvrId = msg.get('MsgSvrID') chat_message.CreateTime = msg.get('CreateTime') * 1000 if msg.get('IsSender') == 1: chat_message.IsSend = True chat_message.Content = msg.get('StrContent').encode('utf-8') else: chat_message.IsSend = False if 'chatroom' in task.FriendId: chat_message.Content = (bytes_Extra.get('3')[0].get('2') + ":" + msg.get('StrContent')).encode('utf-8') else: chat_message.Content = msg.get('StrContent').encode('utf-8') notice.Messages.append(chat_message) notice.Size = 50 notice.Count = len(messages_list) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.HistoryMsgPushNotice transport_message.Content.Pack(notice) transport_message.AccessToken = self.accessToken self.send(transport_message) def trigger_conversation_push_task_handle(self,task:TriggerConversationPushTaskMessage): logger.info("收到会话推送任务: %s", task) notice = ConversationPushNoticeMessage() notice.WeChatId = task.WeChatId notice.TaskId = task.TaskId start_time = int(task.StartTime / 1000) sessions = (self.robot.wcf.query_sql("MicroMsg.db", "select * from Session where nTime > "+str(start_time)+" and nMsgType != 49 and nMsgType != 0 order by nTime DESC limit 500")) for session in sessions: logger.info("会话: %s", session) conver = ConversMessage() conver.UserName = session.get('strUsrName') conver.Digest = session.get('strContent') conver.UnreadCnt = session.get('nUnReadCount') conver.UpdateTime = session.get('nTime') * 1000 conver.ShowName = session.get('strNickName') if session.get('strUsrName','') in self.robot.contacts: conver.Avatar = self.robot.contacts.get(session.get('strUsrName',''),'').get('avatar','') notice.Convers.append(conver) notice.Size = 500 notice.Count = len(sessions) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.ConversationPushNotice transport_message.Content.Pack(notice) transport_message.AccessToken = self.accessToken self.send(transport_message) def trigger_chatroom_push_task_handle(self, trigger_chatroom_push_task: TriggerChatRoomPushTaskMessage): logger.info("收到群推送任务: %s", trigger_chatroom_push_task) chatroom_push_notice_message = ChatRoomPushNoticeMessage() chatroom_push_notice_message.WeChatId = trigger_chatroom_push_task.WeChatId chatroom_push_notice_message.TaskId = trigger_chatroom_push_task.TaskId for wxid,contact in self.robot.contacts.items(): if 'chatroom' not in wxid: logger.info("非群组不推送: %s",wxid) continue logger.info("群组:%s",contact) chatroom_message = ChatRoomMessage() chatroom_message.UserName = wxid chatroom_message.NickName = contact.get('name') chatroom_message.Avatar = contact.get('avatar','') if isinstance(contact.get('Reserved2'),str): chatroom_message.Owner = contact.get('Reserved2','') username_list = contact.get('UserNameList','').split('^') for username in username_list: chatroom_message.MemberList.append(username) display_name_list = contact.get('DisplayNameList','').strip('^G').split('^G') for index,member in enumerate(chatroom_message.MemberList): display = ChatRoomMessage.DisplayNameMessage() display.UserName = member if index < len(display_name_list): display.ShowName = display_name_list[index] display.Flag = 1 chatroom_message.ShowNameList.append(display) chatroom_push_notice_message.ChatRooms.append(chatroom_message) chatroom_push_notice_message.Count = len(chatroom_push_notice_message.ChatRooms) chatroom_push_notice_message.Size = len(chatroom_push_notice_message.ChatRooms) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.ChatroomPushNotice transport_message.Content.Pack(chatroom_push_notice_message) transport_message.AccessToken = self.accessToken self.send(transport_message) def trigger_friend_push_task_handle(self,trigger_friend_push_task: TriggerFriendPushTaskMessage): logger.info("收到好友推送任务: %s", trigger_friend_push_task) friend_push_notice_message = FriendPushNoticeMessage() friend_push_notice_message.WeChatId = trigger_friend_push_task.WeChatId friend_push_notice_message.TaskId = trigger_friend_push_task.TaskId for wxid,contact in self.robot.contacts.items(): if '@chatroom' in wxid or '@openim' in wxid or '@im.chatroom' in wxid or wxid.startswith("gh_"): logger.info("群组/企业微信/公众号 不推送: %s",wxid) continue if not contact.get('Type'): continue friend_message = FriendMessage() friend_message.FriendId = wxid friend_message.FriendNo = contact.get('code') friend_message.FriendNick = contact.get('name') if contact.get('gender') == "男": friend_message.Gender = EnumGender.Male elif contact.get('gender') == "女": friend_message.Gender = EnumGender.Female else: friend_message.Gender = EnumGender.UnknownGender friend_message.Avatar = contact.get('avatar','') friend_message.Country = contact.get('country') friend_message.Province = contact.get('province') friend_message.City = contact.get('city') friend_message.Remark = contact.get('remark') friend_message.Memo = contact.get('remark') friend_message.Type = contact.get('Type') friend_push_notice_message.Friends.append(friend_message) friend_push_notice_message.Count = len(friend_push_notice_message.Friends) friend_push_notice_message.Size = len(friend_push_notice_message.Friends) # 构造发送数据 transport_message = TransportMessage() transport_message.MsgType = EnumMsgType.FriendPushNotice transport_message.Content.Pack(friend_push_notice_message) transport_message.AccessToken = self.accessToken self.send(transport_message) def send(self,transport_message: TransportMessage): # 序列化发送数据,并按网络协议要求打包 serialized_data = transport_message.SerializeToString() message_length = len(serialized_data) message = message_length.to_bytes(4, byteorder='big') + serialized_data # 发送请求 self.client_socket.sendall(message)