123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563 |
- import base64
- import json
- import os
- import socket
- import sys
- sys.path.append(os.getcwd()+'/workphone')
- import threading
- import time
- import uuid
- from typing import Optional
- from wcferry import WxMsg
- from service.robot import Robot
- from common.log import logger
- from workphone.DeviceAuthReq_pb2 import DeviceAuthReqMessage
- from workphone.DeviceAuthRsp_pb2 import DeviceAuthRspMessage
- from workphone.ErrorMessage_pb2 import ErrorMessage
- from workphone.TransportMessage_pb2 import TransportMessage, EnumMsgType, EnumContentType, EnumGender
- from workphone.WeChatOnlineNotice_pb2 import WeChatOnlineNoticeMessage
- from workphone.FriendTalkNotice_pb2 import FriendTalkNoticeMessage
- from workphone.WeChatTalkToFriendNotice_pb2 import WeChatTalkToFriendNoticeMessage
- from workphone.TalkToFriendTask_pb2 import TalkToFriendTaskMessage
- from workphone.TalkToFriendTaskResultNotice_pb2 import TalkToFriendTaskResultNoticeMessage
- from workphone.FriendAddNotice_pb2 import FriendMessage
- from workphone.FriendPushNotice_pb2 import FriendPushNoticeMessage
- from workphone.TriggerFriendPushTask_pb2 import TriggerFriendPushTaskMessage
- from workphone.ChatRoomPushNotice_pb2 import ChatRoomPushNoticeMessage, ChatRoomMessage
- from workphone.ConversationPushNotice_pb2 import ConversationPushNoticeMessage, ConversMessage
- from workphone.HistoryMsgPushNotice_pb2 import HistoryMsgPushNoticeMessage, ChatMessage
- from workphone.TriggerChatRoomPushTask_pb2 import TriggerChatRoomPushTaskMessage
- from workphone.TriggerConversationPushTask_pb2 import TriggerConversationPushTaskMessage
- from workphone.TriggerHistoryMsgPushTask_pb2 import TriggerHistoryMsgPushTaskMessage
- class SocketClient:
- accessToken = None
- client_socket = None
- thread_register = None
- thread_onmessage = None
- status = False
- def __init__(self,robot:Optional[Robot]):
- if robot:
- self.robot = robot
- logger.info("[socket] 获取用户信息: %s", self.robot.user)
- self.start()
- def stop(self):
- self.status = False
- self.thread_register.join()
- self.thread_onmessage.join()
- self.client_socket.close()
- logger.info("[socket] socket client closed")
- self.accessToken = None
- self.client_socket = None
- self.thread_register = None
- self.thread_onmessage = None
- def start(self):
- self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.client_socket.connect(("chat.gkscrm.com", 13087))
- # keep_alive 设置, 有心跳包,这个应该用不到,服务端也没按这个保活
- # self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) # 空闲60秒后启动保活检测
- # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) # 每隔30秒发送一次保活包
- # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
- logger.info("[socket] socket client connected")
- self.status = True
- # 开新线程 注册设备,注册完成后,发起上线通知,然后定时发送心跳包
- self.thread_register = threading.Thread(target=self.register, daemon=True)
- self.thread_register.start()
- # 监听消息
- self.thread_onmessage = threading.Thread(target=self.listen_for_messages, daemon=True)
- self.thread_onmessage.start()
- def listen_for_messages(self):
- while True and self.status:
- # 接收实际数据
- response_length_data = self.client_socket.recv(4)
- response_length = int.from_bytes(response_length_data, byteorder='big')
- response_data = self.client_socket.recv(response_length)
- logger.info("[socket] 接收到的原始数据: %s", response_data)
- # 接收数据解包
- message = TransportMessage()
- message.ParseFromString(response_data)
- logger.info("[socket] 接收到的 TransportMessage: %s", message)
- if message.MsgType != EnumMsgType.MsgReceivedAck and message.MsgType != EnumMsgType.DeviceAuthRsp and self.accessToken:
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.MsgReceivedAck
- transport_message.AccessToken = self.accessToken
- transport_message.RefMessageId = message.Id
- self.send(transport_message)
- if message.MsgType == EnumMsgType.MsgReceivedAck:
- if message.AccessToken != self.accessToken:
- logger.error("[socket] 设备身份验证失败: accessToken 不匹配")
- continue
- else:
- logger.info("[socket] receive ack")
- continue
- elif message.MsgType == EnumMsgType.DeviceAuthRsp:
- device_auth_rsp = DeviceAuthRspMessage()
- message.Content.Unpack(device_auth_rsp)
- logger.info("[socket] 设备认证返回: %s", device_auth_rsp)
- self.accessToken = device_auth_rsp.AccessToken
- logger.info("[socket] 设备认证注册成功")
- elif message.MsgType == EnumMsgType.TriggerHistoryMsgPushTask:
- # 获取对话历史记录
- task = TriggerHistoryMsgPushTaskMessage()
- message.Content.Unpack(task)
- self.trigger_history_msg_push_task_handle(task)
- elif message.MsgType == EnumMsgType.TriggerConversationPushTask:
- # 刷新会话列表
- task = TriggerConversationPushTaskMessage()
- message.Content.Unpack(task)
- self.trigger_conversation_push_task_handle(task)
- elif message.MsgType == EnumMsgType.TriggerFriendPushTask:
- # 刷新联系人
- trigger_friend_push_task = TriggerFriendPushTaskMessage()
- message.Content.Unpack(trigger_friend_push_task)
- self.trigger_friend_push_task_handle(trigger_friend_push_task)
- elif message.MsgType == EnumMsgType.TriggerChatroomPushTask:
- # 刷新群组
- trigger_chatroom_push_task = TriggerChatRoomPushTaskMessage()
- message.Content.Unpack(trigger_chatroom_push_task)
- self.trigger_chatroom_push_task_handle(trigger_chatroom_push_task)
- elif message.MsgType == EnumMsgType.TalkToFriendTask:
- talk_to_friend_task = TalkToFriendTaskMessage()
- message.Content.Unpack(talk_to_friend_task)
- if talk_to_friend_task.Remark:
- sender = self.robot.contacts.get(talk_to_friend_task.Remark, {})
- content = "@" + sender.get("name") + " " + talk_to_friend_task.Content.decode("utf-8")
- talk_to_friend_task.Content = content.encode("utf-8")
- logger.info("[socket] 需要发送的消息如下: %s", talk_to_friend_task)
- send_status = -1
- if talk_to_friend_task.ContentType == EnumContentType.Text:
- logger.info("[socket] 发送文本消息")
- send_status = self.robot.wcf.send_text(talk_to_friend_task.Content.decode("utf-8"),talk_to_friend_task.FriendId)
- elif talk_to_friend_task.ContentType == EnumContentType.Picture:
- logger.info("[socket] 发送图片消息")
- send_status = self.robot.wcf.send_image(talk_to_friend_task.Content.decode("utf-8"),
- talk_to_friend_task.FriendId)
- elif talk_to_friend_task.ContentType == EnumContentType.File:
- logger.info("[socket] 发送文件消息")
- file_content = json.loads(talk_to_friend_task.Content.decode("utf-8"))
- if "url" in file_content:
- # wcferry 发送文件接口有错,使用发送图片接口代替
- send_status = self.robot.wcf.send_image(file_content.get("url",""),talk_to_friend_task.FriendId)
- else:
- logger.error("[socket] 发送文件失败,请检查文件地址是否正确")
- continue
- if send_status == 0:
- logger.info("[socket] 发送图片成功")
- msg_id = int(uuid.uuid4()) >> (128-19)
- notice = TalkToFriendTaskResultNoticeMessage()
- notice.Success = True
- notice.WeChatId = talk_to_friend_task.WeChatId
- notice.FriendId = talk_to_friend_task.FriendId
- notice.MsgId = talk_to_friend_task.MsgId
- notice.MsgSvrId = msg_id
- notice.CreateTime = int(time.time() * 1000)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.TalkToFriendTaskResultNotice
- transport_message.Content.Pack(notice)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- wechat_talk_to_friend_notice_message = WeChatTalkToFriendNoticeMessage()
- wechat_talk_to_friend_notice_message.WeChatId = talk_to_friend_task.WeChatId
- wechat_talk_to_friend_notice_message.FriendId = talk_to_friend_task.FriendId
- wechat_talk_to_friend_notice_message.ContentType = EnumContentType.Text
- wechat_talk_to_friend_notice_message.Content = talk_to_friend_task.Content
- wechat_talk_to_friend_notice_message.msgSvrId = msg_id
- wechat_talk_to_friend_notice_message.CreateTime = int(time.time() * 1000)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice
- transport_message.Content.Pack(wechat_talk_to_friend_notice_message)
- transport_message.AccessToken = self.accessToken
- elif message.MsgType == EnumMsgType.Error:
- err = ErrorMessage()
- message.Content.Unpack(err)
- logger.error("[socket] 错误信息: %s", err)
- def register(self):
- mac = uuid.getnode()
- while self.accessToken is None and self.status:
- # 构造设备认证消息
- device_auth = DeviceAuthReqMessage()
- device_auth.AuthType = DeviceAuthReqMessage.EnumAuthType.DeviceCode
- device_auth.Credential = "PC-" + str(mac)
- logger.info("[socket] 设备认证数据: %s", device_auth)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.DeviceAuthReq
- transport_message.Content.Pack(device_auth)
- self.send(transport_message)
- time.sleep(5)
- if self.status:
- # 开启消息接收处理
- self.robot.enableReceivingMsgCallback(self.on_msg)
- # 上线通知
- online = WeChatOnlineNoticeMessage()
- # logger.info(f"\n----------------self.robot.user: {self.robot.user}", exc_info=True)
- online.WeChatId = self.robot.user.get('wxid')
- online.WeChatNo = self.robot.user.get('code', "")
- online.WeChatNick = self.robot.user.get('name', "")
- if self.robot.user.get('gender') == "男":
- online.Gender = EnumGender.Male
- elif self.robot.user.get('gender') == "女":
- online.Gender = EnumGender.Female
- else:
- online.Gender = EnumGender.UnknownGender
- online.Country = self.robot.user.get('country', "")
- online.Province = self.robot.user.get('province', "")
- online.City = self.robot.user.get('city', "")
- online.Avatar = self.robot.user.get('avatar','')
- online.IMEI = "PC-" + str(mac)
- online.Phone = self.robot.user.get('mobile', "")
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.WeChatOnlineNotice
- transport_message.Content.Pack(online)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- # 发送心跳包
- while True and self.status:
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.HeartBeatReq
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- time.sleep(30)
- def on_msg(self,msg:WxMsg):
- # msg 转 json
- logger.info("收到消息: %s", json.dumps(msg.__dict__,ensure_ascii=False, indent=4))
- notice = None
- transport_message = None
- if msg.from_self():
- if msg.is_text():
- notice = WeChatTalkToFriendNoticeMessage()
- notice.WeChatId = msg.sender
- notice.FriendId = msg.roomid
- if msg.is_text():
- notice.ContentType = EnumContentType.Text
- notice.Content = str(msg.content).encode('utf-8')
- notice.msgSvrId = msg.id
- notice.CreateTime = msg.ts
- logger.info("准备发送的消息体: %s", notice)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice
- transport_message.Content.Pack(notice)
- transport_message.AccessToken = self.accessToken
- else:
- if msg.is_text():
- logger.info('获取好友信息 start')
- sender = self.robot.contacts.get(msg.sender, {})
- logger.info('获取好友信息 end: %s',sender)
- logger.info("好友信息: %s", json.dumps(sender,ensure_ascii=False, indent=4))
- notice = FriendTalkNoticeMessage()
- notice.WeChatId = self.robot.user.get('wxid')
- notice.ContentType = EnumContentType.Text
- notice.msgSvrId = msg.id
- notice.CreateTime = msg.ts
- if msg.from_group():
- notice.FriendId = msg.roomid
- notice.Content = (msg.sender + ":\n" + str(msg.content)).encode('utf-8')
- else:
- notice.FriendId = msg.sender
- notice.Content = str(msg.content).encode('utf-8')
- notice.NickName = sender.get('name')
- logger.info("准备发送的消息体: %s", notice)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.FriendTalkNotice
- transport_message.Content.Pack(notice)
- transport_message.AccessToken = self.accessToken
- if transport_message:
- logger.info("[socket] 发送消息: %s", transport_message)
- self.send(transport_message)
- def trigger_history_msg_push_task_handle(self,task:TriggerHistoryMsgPushTaskMessage):
- logger.info("收到历史消息推送任务: %s", task)
- notice = HistoryMsgPushNoticeMessage()
- notice.WeChatId = task.WeChatId
- notice.TaskId = task.TaskId
- messages_list = self.robot.wcf.query_sql("MSG2.db",
- "select * from MSG where StrTalker='"+task.FriendId+"' order by localId DESC limit " + str(task.Count))
- for msg in messages_list:
- logger.info("消息: %s", msg)
- bytes_Extra = self.robot.get_BytesExtra(msg.get('BytesExtra'))
- chat_message = ChatMessage()
- chat_message.FriendId = task.FriendId
- if msg.get('Type') == 1:
- chat_message.ContentType = EnumContentType.Text
- elif msg.get('Type') == 3:
- chat_message.ContentType = EnumContentType.Picture
- elif msg.get('Type') == 34:
- chat_message.ContentType = EnumContentType.Voice
- elif msg.get('Type') == 43:
- chat_message.ContentType = EnumContentType.Video
- elif msg.get('Type') == 48:
- chat_message.ContentType = EnumContentType.Location
- elif msg.get('Type') == 49:
- chat_message.ContentType = EnumContentType.File
- else:
- chat_message.ContentType = EnumContentType.UnSupport
- chat_message.Content = msg.get('StrContent').encode('utf-8')
- chat_message.MsgId = msg.get('localId')
- chat_message.MsgSvrId = msg.get('MsgSvrID')
- chat_message.CreateTime = msg.get('CreateTime') * 1000
- if msg.get('IsSender') == 1:
- chat_message.IsSend = True
- chat_message.Content = msg.get('StrContent').encode('utf-8')
- else:
- chat_message.IsSend = False
- if 'chatroom' in task.FriendId:
- chat_message.Content = (bytes_Extra.get('3')[0].get('2') + ":" + msg.get('StrContent')).encode('utf-8')
- else:
- chat_message.Content = msg.get('StrContent').encode('utf-8')
- notice.Messages.append(chat_message)
- notice.Size = 50
- notice.Count = len(messages_list)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.HistoryMsgPushNotice
- transport_message.Content.Pack(notice)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- def trigger_conversation_push_task_handle(self,task:TriggerConversationPushTaskMessage):
- logger.info("收到会话推送任务: %s", task)
- notice = ConversationPushNoticeMessage()
- notice.WeChatId = task.WeChatId
- notice.TaskId = task.TaskId
- start_time = int(task.StartTime / 1000)
- sessions = (self.robot.wcf.query_sql("MicroMsg.db",
- "select * from Session where nTime > "+str(start_time)+" and nMsgType != 49 and nMsgType != 0 order by nTime DESC limit 500"))
- for session in sessions:
- logger.info("会话: %s", session)
- conver = ConversMessage()
- conver.UserName = session.get('strUsrName')
- conver.Digest = session.get('strContent')
- conver.UnreadCnt = session.get('nUnReadCount')
- conver.UpdateTime = session.get('nTime') * 1000
- conver.ShowName = session.get('strNickName')
- if session.get('strUsrName','') in self.robot.contacts:
- conver.Avatar = self.robot.contacts.get(session.get('strUsrName',''),'').get('avatar','')
- notice.Convers.append(conver)
- notice.Size = 500
- notice.Count = len(sessions)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.ConversationPushNotice
- transport_message.Content.Pack(notice)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- def trigger_chatroom_push_task_handle(self, trigger_chatroom_push_task: TriggerChatRoomPushTaskMessage):
- logger.info("收到群推送任务: %s", trigger_chatroom_push_task)
- chatroom_push_notice_message = ChatRoomPushNoticeMessage()
- chatroom_push_notice_message.WeChatId = trigger_chatroom_push_task.WeChatId
- chatroom_push_notice_message.TaskId = trigger_chatroom_push_task.TaskId
- for wxid,contact in self.robot.contacts.items():
- if 'chatroom' not in wxid:
- logger.info("非群组不推送: %s",wxid)
- continue
- logger.info("群组:%s",contact)
- chatroom_message = ChatRoomMessage()
- chatroom_message.UserName = wxid
- chatroom_message.NickName = contact.get('name')
- chatroom_message.Avatar = contact.get('avatar','')
- if isinstance(contact.get('Reserved2'),str):
- chatroom_message.Owner = contact.get('Reserved2','')
- username_list = contact.get('UserNameList','').split('^')
- for username in username_list:
- chatroom_message.MemberList.append(username)
- display_name_list = contact.get('DisplayNameList','').strip('^G').split('^G')
- for index,member in enumerate(chatroom_message.MemberList):
- display = ChatRoomMessage.DisplayNameMessage()
- display.UserName = member
- if index < len(display_name_list):
- display.ShowName = display_name_list[index]
- display.Flag = 1
- chatroom_message.ShowNameList.append(display)
- chatroom_push_notice_message.ChatRooms.append(chatroom_message)
- chatroom_push_notice_message.Count = len(chatroom_push_notice_message.ChatRooms)
- chatroom_push_notice_message.Size = len(chatroom_push_notice_message.ChatRooms)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.ChatroomPushNotice
- transport_message.Content.Pack(chatroom_push_notice_message)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- def trigger_friend_push_task_handle(self,trigger_friend_push_task: TriggerFriendPushTaskMessage):
- logger.info("收到好友推送任务: %s", trigger_friend_push_task)
- friend_push_notice_message = FriendPushNoticeMessage()
- friend_push_notice_message.WeChatId = trigger_friend_push_task.WeChatId
- friend_push_notice_message.TaskId = trigger_friend_push_task.TaskId
- for wxid,contact in self.robot.contacts.items():
- if '@chatroom' in wxid or '@openim' in wxid or '@im.chatroom' in wxid or wxid.startswith("gh_"):
- logger.info("群组/企业微信/公众号 不推送: %s",wxid)
- continue
- if not contact.get('Type'):
- continue
- friend_message = FriendMessage()
- friend_message.FriendId = wxid
- friend_message.FriendNo = contact.get('code')
- friend_message.FriendNick = contact.get('name')
- if contact.get('gender') == "男":
- friend_message.Gender = EnumGender.Male
- elif contact.get('gender') == "女":
- friend_message.Gender = EnumGender.Female
- else:
- friend_message.Gender = EnumGender.UnknownGender
- friend_message.Avatar = contact.get('avatar','')
- friend_message.Country = contact.get('country')
- friend_message.Province = contact.get('province')
- friend_message.City = contact.get('city')
- friend_message.Remark = contact.get('remark')
- friend_message.Memo = contact.get('remark')
- friend_message.Type = contact.get('Type')
- friend_push_notice_message.Friends.append(friend_message)
- friend_push_notice_message.Count = len(friend_push_notice_message.Friends)
- friend_push_notice_message.Size = len(friend_push_notice_message.Friends)
- # 构造发送数据
- transport_message = TransportMessage()
- transport_message.MsgType = EnumMsgType.FriendPushNotice
- transport_message.Content.Pack(friend_push_notice_message)
- transport_message.AccessToken = self.accessToken
- self.send(transport_message)
- def send(self,transport_message: TransportMessage):
- # 序列化发送数据,并按网络协议要求打包
- serialized_data = transport_message.SerializeToString()
- message_length = len(serialized_data)
- message = message_length.to_bytes(4, byteorder='big') + serialized_data
- # 发送请求
- self.client_socket.sendall(message)
|