socket_client.py 24 KB


  1. import base64
  2. import json
  3. import os
  4. import socket
  5. import sys
  6. sys.path.append(os.getcwd()+'/workphone')
  7. import threading
  8. import time
  9. import uuid
  10. from typing import Optional
  11. from wcferry import WxMsg
  12. from service.robot import Robot
  13. from common.log import logger
  14. from workphone.DeviceAuthReq_pb2 import DeviceAuthReqMessage
  15. from workphone.DeviceAuthRsp_pb2 import DeviceAuthRspMessage
  16. from workphone.ErrorMessage_pb2 import ErrorMessage
  17. from workphone.TransportMessage_pb2 import TransportMessage, EnumMsgType, EnumContentType, EnumGender
  18. from workphone.WeChatOnlineNotice_pb2 import WeChatOnlineNoticeMessage
  19. from workphone.FriendTalkNotice_pb2 import FriendTalkNoticeMessage
  20. from workphone.WeChatTalkToFriendNotice_pb2 import WeChatTalkToFriendNoticeMessage
  21. from workphone.TalkToFriendTask_pb2 import TalkToFriendTaskMessage
  22. from workphone.TalkToFriendTaskResultNotice_pb2 import TalkToFriendTaskResultNoticeMessage
  23. from workphone.FriendAddNotice_pb2 import FriendMessage
  24. from workphone.FriendPushNotice_pb2 import FriendPushNoticeMessage
  25. from workphone.TriggerFriendPushTask_pb2 import TriggerFriendPushTaskMessage
  26. from workphone.ChatRoomPushNotice_pb2 import ChatRoomPushNoticeMessage, ChatRoomMessage
  27. from workphone.ConversationPushNotice_pb2 import ConversationPushNoticeMessage, ConversMessage
  28. from workphone.HistoryMsgPushNotice_pb2 import HistoryMsgPushNoticeMessage, ChatMessage
  29. from workphone.TriggerChatRoomPushTask_pb2 import TriggerChatRoomPushTaskMessage
  30. from workphone.TriggerConversationPushTask_pb2 import TriggerConversationPushTaskMessage
  31. from workphone.TriggerHistoryMsgPushTask_pb2 import TriggerHistoryMsgPushTaskMessage
  32. class SocketClient:
  33. accessToken = None
  34. client_socket = None
  35. thread_register = None
  36. thread_onmessage = None
  37. status = False
  38. def __init__(self,robot:Optional[Robot]):
  39. if robot:
  40. self.robot = robot
  41. logger.info("[socket] 获取用户信息: %s", self.robot.user)
  42. self.start()
  43. def stop(self):
  44. self.status = False
  45. self.thread_register.join()
  46. self.thread_onmessage.join()
  47. self.client_socket.close()
  48. logger.info("[socket] socket client closed")
  49. self.accessToken = None
  50. self.client_socket = None
  51. self.thread_register = None
  52. self.thread_onmessage = None
  53. def start(self):
  54. self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  55. self.client_socket.connect(("chat.gkscrm.com", 13087))
  56. # keep_alive 设置, 有心跳包,这个应该用不到,服务端也没按这个保活
  57. # self.client_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
  58. # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) # 空闲60秒后启动保活检测
  59. # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30) # 每隔30秒发送一次保活包
  60. # self.client_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
  61. logger.info("[socket] socket client connected")
  62. self.status = True
  63. # 开新线程 注册设备,注册完成后,发起上线通知,然后定时发送心跳包
  64. self.thread_register = threading.Thread(target=self.register, daemon=True)
  65. self.thread_register.start()
  66. # 监听消息
  67. self.thread_onmessage = threading.Thread(target=self.listen_for_messages, daemon=True)
  68. self.thread_onmessage.start()
  69. def listen_for_messages(self):
  70. while True and self.status:
  71. # 接收实际数据
  72. response_length_data = self.client_socket.recv(4)
  73. response_length = int.from_bytes(response_length_data, byteorder='big')
  74. response_data = self.client_socket.recv(response_length)
  75. logger.info("[socket] 接收到的原始数据: %s", response_data)
  76. # 接收数据解包
  77. message = TransportMessage()
  78. message.ParseFromString(response_data)
  79. logger.info("[socket] 接收到的 TransportMessage: %s", message)
  80. if message.MsgType != EnumMsgType.MsgReceivedAck and message.MsgType != EnumMsgType.DeviceAuthRsp and self.accessToken:
  81. # 构造发送数据
  82. transport_message = TransportMessage()
  83. transport_message.MsgType = EnumMsgType.MsgReceivedAck
  84. transport_message.AccessToken = self.accessToken
  85. transport_message.RefMessageId = message.Id
  86. self.send(transport_message)
  87. if message.MsgType == EnumMsgType.MsgReceivedAck:
  88. if message.AccessToken != self.accessToken:
  89. logger.error("[socket] 设备身份验证失败: accessToken 不匹配")
  90. continue
  91. else:
  92. logger.info("[socket] receive ack")
  93. continue
  94. elif message.MsgType == EnumMsgType.DeviceAuthRsp:
  95. device_auth_rsp = DeviceAuthRspMessage()
  96. message.Content.Unpack(device_auth_rsp)
  97. logger.info("[socket] 设备认证返回: %s", device_auth_rsp)
  98. self.accessToken = device_auth_rsp.AccessToken
  99. logger.info("[socket] 设备认证注册成功")
  100. elif message.MsgType == EnumMsgType.TriggerHistoryMsgPushTask:
  101. # 获取对话历史记录
  102. task = TriggerHistoryMsgPushTaskMessage()
  103. message.Content.Unpack(task)
  104. self.trigger_history_msg_push_task_handle(task)
  105. elif message.MsgType == EnumMsgType.TriggerConversationPushTask:
  106. # 刷新会话列表
  107. task = TriggerConversationPushTaskMessage()
  108. message.Content.Unpack(task)
  109. self.trigger_conversation_push_task_handle(task)
  110. elif message.MsgType == EnumMsgType.TriggerFriendPushTask:
  111. # 刷新联系人
  112. trigger_friend_push_task = TriggerFriendPushTaskMessage()
  113. message.Content.Unpack(trigger_friend_push_task)
  114. self.trigger_friend_push_task_handle(trigger_friend_push_task)
  115. elif message.MsgType == EnumMsgType.TriggerChatroomPushTask:
  116. # 刷新群组
  117. trigger_chatroom_push_task = TriggerChatRoomPushTaskMessage()
  118. message.Content.Unpack(trigger_chatroom_push_task)
  119. self.trigger_chatroom_push_task_handle(trigger_chatroom_push_task)
  120. elif message.MsgType == EnumMsgType.TalkToFriendTask:
  121. talk_to_friend_task = TalkToFriendTaskMessage()
  122. message.Content.Unpack(talk_to_friend_task)
  123. if talk_to_friend_task.Remark:
  124. sender = self.robot.contacts.get(talk_to_friend_task.Remark, {})
  125. content = "@" + sender.get("name") + " " + talk_to_friend_task.Content.decode("utf-8")
  126. talk_to_friend_task.Content = content.encode("utf-8")
  127. logger.info("[socket] 需要发送的消息如下: %s", talk_to_friend_task)
  128. send_status = -1
  129. if talk_to_friend_task.ContentType == EnumContentType.Text:
  130. logger.info("[socket] 发送文本消息")
  131. send_status = self.robot.wcf.send_text(talk_to_friend_task.Content.decode("utf-8"),talk_to_friend_task.FriendId)
  132. elif talk_to_friend_task.ContentType == EnumContentType.Picture:
  133. logger.info("[socket] 发送图片消息")
  134. send_status = self.robot.wcf.send_image(talk_to_friend_task.Content.decode("utf-8"),
  135. talk_to_friend_task.FriendId)
  136. elif talk_to_friend_task.ContentType == EnumContentType.File:
  137. logger.info("[socket] 发送文件消息")
  138. file_content = json.loads(talk_to_friend_task.Content.decode("utf-8"))
  139. if "url" in file_content:
  140. # wcferry 发送文件接口有错,使用发送图片接口代替
  141. send_status = self.robot.wcf.send_image(file_content.get("url",""),talk_to_friend_task.FriendId)
  142. else:
  143. logger.error("[socket] 发送文件失败,请检查文件地址是否正确")
  144. continue
  145. if send_status == 0:
  146. logger.info("[socket] 发送图片成功")
  147. msg_id = int(uuid.uuid4()) >> (128-19)
  148. notice = TalkToFriendTaskResultNoticeMessage()
  149. notice.Success = True
  150. notice.WeChatId = talk_to_friend_task.WeChatId
  151. notice.FriendId = talk_to_friend_task.FriendId
  152. notice.MsgId = talk_to_friend_task.MsgId
  153. notice.MsgSvrId = msg_id
  154. notice.CreateTime = int(time.time() * 1000)
  155. # 构造发送数据
  156. transport_message = TransportMessage()
  157. transport_message.MsgType = EnumMsgType.TalkToFriendTaskResultNotice
  158. transport_message.Content.Pack(notice)
  159. transport_message.AccessToken = self.accessToken
  160. self.send(transport_message)
  161. wechat_talk_to_friend_notice_message = WeChatTalkToFriendNoticeMessage()
  162. wechat_talk_to_friend_notice_message.WeChatId = talk_to_friend_task.WeChatId
  163. wechat_talk_to_friend_notice_message.FriendId = talk_to_friend_task.FriendId
  164. wechat_talk_to_friend_notice_message.ContentType = EnumContentType.Text
  165. wechat_talk_to_friend_notice_message.Content = talk_to_friend_task.Content
  166. wechat_talk_to_friend_notice_message.msgSvrId = msg_id
  167. wechat_talk_to_friend_notice_message.CreateTime = int(time.time() * 1000)
  168. # 构造发送数据
  169. transport_message = TransportMessage()
  170. transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice
  171. transport_message.Content.Pack(wechat_talk_to_friend_notice_message)
  172. transport_message.AccessToken = self.accessToken
  173. elif message.MsgType == EnumMsgType.Error:
  174. err = ErrorMessage()
  175. message.Content.Unpack(err)
  176. logger.error("[socket] 错误信息: %s", err)
  177. def register(self):
  178. mac = uuid.getnode()
  179. while self.accessToken is None and self.status:
  180. # 构造设备认证消息
  181. device_auth = DeviceAuthReqMessage()
  182. device_auth.AuthType = DeviceAuthReqMessage.EnumAuthType.DeviceCode
  183. device_auth.Credential = "PC-" + str(mac)
  184. logger.info("[socket] 设备认证数据: %s", device_auth)
  185. # 构造发送数据
  186. transport_message = TransportMessage()
  187. transport_message.MsgType = EnumMsgType.DeviceAuthReq
  188. transport_message.Content.Pack(device_auth)
  189. self.send(transport_message)
  190. time.sleep(5)
  191. if self.status:
  192. # 开启消息接收处理
  193. self.robot.enableReceivingMsgCallback(self.on_msg)
  194. # 上线通知
  195. online = WeChatOnlineNoticeMessage()
  196. # logger.info(f"\n----------------self.robot.user: {self.robot.user}", exc_info=True)
  197. online.WeChatId = self.robot.user.get('wxid')
  198. online.WeChatNo = self.robot.user.get('code', "")
  199. online.WeChatNick = self.robot.user.get('name', "")
  200. if self.robot.user.get('gender') == "男":
  201. online.Gender = EnumGender.Male
  202. elif self.robot.user.get('gender') == "女":
  203. online.Gender = EnumGender.Female
  204. else:
  205. online.Gender = EnumGender.UnknownGender
  206. online.Country = self.robot.user.get('country', "")
  207. online.Province = self.robot.user.get('province', "")
  208. online.City = self.robot.user.get('city', "")
  209. online.Avatar = self.robot.user.get('avatar','')
  210. online.IMEI = "PC-" + str(mac)
  211. online.Phone = self.robot.user.get('mobile', "")
  212. # 构造发送数据
  213. transport_message = TransportMessage()
  214. transport_message.MsgType = EnumMsgType.WeChatOnlineNotice
  215. transport_message.Content.Pack(online)
  216. transport_message.AccessToken = self.accessToken
  217. self.send(transport_message)
  218. # 发送心跳包
  219. while True and self.status:
  220. transport_message = TransportMessage()
  221. transport_message.MsgType = EnumMsgType.HeartBeatReq
  222. transport_message.AccessToken = self.accessToken
  223. self.send(transport_message)
  224. time.sleep(30)
  225. def on_msg(self,msg:WxMsg):
  226. # msg 转 json
  227. logger.info("收到消息: %s", json.dumps(msg.__dict__,ensure_ascii=False, indent=4))
  228. notice = None
  229. transport_message = None
  230. if msg.from_self():
  231. if msg.is_text():
  232. notice = WeChatTalkToFriendNoticeMessage()
  233. notice.WeChatId = msg.sender
  234. notice.FriendId = msg.roomid
  235. if msg.is_text():
  236. notice.ContentType = EnumContentType.Text
  237. notice.Content = str(msg.content).encode('utf-8')
  238. notice.msgSvrId = msg.id
  239. notice.CreateTime = msg.ts
  240. logger.info("准备发送的消息体: %s", notice)
  241. # 构造发送数据
  242. transport_message = TransportMessage()
  243. transport_message.MsgType = EnumMsgType.WeChatTalkToFriendNotice
  244. transport_message.Content.Pack(notice)
  245. transport_message.AccessToken = self.accessToken
  246. else:
  247. if msg.is_text():
  248. logger.info('获取好友信息 start')
  249. sender = self.robot.contacts.get(msg.sender, {})
  250. logger.info('获取好友信息 end: %s',sender)
  251. logger.info("好友信息: %s", json.dumps(sender,ensure_ascii=False, indent=4))
  252. notice = FriendTalkNoticeMessage()
  253. notice.WeChatId = self.robot.user.get('wxid')
  254. notice.ContentType = EnumContentType.Text
  255. notice.msgSvrId = msg.id
  256. notice.CreateTime = msg.ts
  257. if msg.from_group():
  258. notice.FriendId = msg.roomid
  259. notice.Content = (msg.sender + ":\n" + str(msg.content)).encode('utf-8')
  260. else:
  261. notice.FriendId = msg.sender
  262. notice.Content = str(msg.content).encode('utf-8')
  263. notice.NickName = sender.get('name')
  264. logger.info("准备发送的消息体: %s", notice)
  265. # 构造发送数据
  266. transport_message = TransportMessage()
  267. transport_message.MsgType = EnumMsgType.FriendTalkNotice
  268. transport_message.Content.Pack(notice)
  269. transport_message.AccessToken = self.accessToken
  270. if transport_message:
  271. logger.info("[socket] 发送消息: %s", transport_message)
  272. self.send(transport_message)
  273. def trigger_history_msg_push_task_handle(self,task:TriggerHistoryMsgPushTaskMessage):
  274. logger.info("收到历史消息推送任务: %s", task)
  275. notice = HistoryMsgPushNoticeMessage()
  276. notice.WeChatId = task.WeChatId
  277. notice.TaskId = task.TaskId
  278. messages_list = self.robot.wcf.query_sql("MSG2.db",
  279. "select * from MSG where StrTalker='"+task.FriendId+"' order by localId DESC limit " + str(task.Count))
  280. for msg in messages_list:
  281. logger.info("消息: %s", msg)
  282. bytes_Extra = self.robot.get_BytesExtra(msg.get('BytesExtra'))
  283. chat_message = ChatMessage()
  284. chat_message.FriendId = task.FriendId
  285. if msg.get('Type') == 1:
  286. chat_message.ContentType = EnumContentType.Text
  287. elif msg.get('Type') == 3:
  288. chat_message.ContentType = EnumContentType.Picture
  289. elif msg.get('Type') == 34:
  290. chat_message.ContentType = EnumContentType.Voice
  291. elif msg.get('Type') == 43:
  292. chat_message.ContentType = EnumContentType.Video
  293. elif msg.get('Type') == 48:
  294. chat_message.ContentType = EnumContentType.Location
  295. elif msg.get('Type') == 49:
  296. chat_message.ContentType = EnumContentType.File
  297. else:
  298. chat_message.ContentType = EnumContentType.UnSupport
  299. chat_message.Content = msg.get('StrContent').encode('utf-8')
  300. chat_message.MsgId = msg.get('localId')
  301. chat_message.MsgSvrId = msg.get('MsgSvrID')
  302. chat_message.CreateTime = msg.get('CreateTime') * 1000
  303. if msg.get('IsSender') == 1:
  304. chat_message.IsSend = True
  305. chat_message.Content = msg.get('StrContent').encode('utf-8')
  306. else:
  307. chat_message.IsSend = False
  308. if 'chatroom' in task.FriendId:
  309. chat_message.Content = (bytes_Extra.get('3')[0].get('2') + ":" + msg.get('StrContent')).encode('utf-8')
  310. else:
  311. chat_message.Content = msg.get('StrContent').encode('utf-8')
  312. notice.Messages.append(chat_message)
  313. notice.Size = 50
  314. notice.Count = len(messages_list)
  315. # 构造发送数据
  316. transport_message = TransportMessage()
  317. transport_message.MsgType = EnumMsgType.HistoryMsgPushNotice
  318. transport_message.Content.Pack(notice)
  319. transport_message.AccessToken = self.accessToken
  320. self.send(transport_message)
  321. def trigger_conversation_push_task_handle(self,task:TriggerConversationPushTaskMessage):
  322. logger.info("收到会话推送任务: %s", task)
  323. notice = ConversationPushNoticeMessage()
  324. notice.WeChatId = task.WeChatId
  325. notice.TaskId = task.TaskId
  326. start_time = int(task.StartTime / 1000)
  327. sessions = (self.robot.wcf.query_sql("MicroMsg.db",
  328. "select * from Session where nTime > "+str(start_time)+" and nMsgType != 49 and nMsgType != 0 order by nTime DESC limit 500"))
  329. for session in sessions:
  330. logger.info("会话: %s", session)
  331. conver = ConversMessage()
  332. conver.UserName = session.get('strUsrName')
  333. conver.Digest = session.get('strContent')
  334. conver.UnreadCnt = session.get('nUnReadCount')
  335. conver.UpdateTime = session.get('nTime') * 1000
  336. conver.ShowName = session.get('strNickName')
  337. if session.get('strUsrName','') in self.robot.contacts:
  338. conver.Avatar = self.robot.contacts.get(session.get('strUsrName',''),'').get('avatar','')
  339. notice.Convers.append(conver)
  340. notice.Size = 500
  341. notice.Count = len(sessions)
  342. # 构造发送数据
  343. transport_message = TransportMessage()
  344. transport_message.MsgType = EnumMsgType.ConversationPushNotice
  345. transport_message.Content.Pack(notice)
  346. transport_message.AccessToken = self.accessToken
  347. self.send(transport_message)
  348. def trigger_chatroom_push_task_handle(self, trigger_chatroom_push_task: TriggerChatRoomPushTaskMessage):
  349. logger.info("收到群推送任务: %s", trigger_chatroom_push_task)
  350. chatroom_push_notice_message = ChatRoomPushNoticeMessage()
  351. chatroom_push_notice_message.WeChatId = trigger_chatroom_push_task.WeChatId
  352. chatroom_push_notice_message.TaskId = trigger_chatroom_push_task.TaskId
  353. for wxid,contact in self.robot.contacts.items():
  354. if 'chatroom' not in wxid:
  355. logger.info("非群组不推送: %s",wxid)
  356. continue
  357. logger.info("群组:%s",contact)
  358. chatroom_message = ChatRoomMessage()
  359. chatroom_message.UserName = wxid
  360. chatroom_message.NickName = contact.get('name')
  361. chatroom_message.Avatar = contact.get('avatar','')
  362. if isinstance(contact.get('Reserved2'),str):
  363. chatroom_message.Owner = contact.get('Reserved2','')
  364. username_list = contact.get('UserNameList','').split('^')
  365. for username in username_list:
  366. chatroom_message.MemberList.append(username)
  367. display_name_list = contact.get('DisplayNameList','').strip('^G').split('^G')
  368. for index,member in enumerate(chatroom_message.MemberList):
  369. display = ChatRoomMessage.DisplayNameMessage()
  370. display.UserName = member
  371. if index < len(display_name_list):
  372. display.ShowName = display_name_list[index]
  373. display.Flag = 1
  374. chatroom_message.ShowNameList.append(display)
  375. chatroom_push_notice_message.ChatRooms.append(chatroom_message)
  376. chatroom_push_notice_message.Count = len(chatroom_push_notice_message.ChatRooms)
  377. chatroom_push_notice_message.Size = len(chatroom_push_notice_message.ChatRooms)
  378. # 构造发送数据
  379. transport_message = TransportMessage()
  380. transport_message.MsgType = EnumMsgType.ChatroomPushNotice
  381. transport_message.Content.Pack(chatroom_push_notice_message)
  382. transport_message.AccessToken = self.accessToken
  383. self.send(transport_message)
  384. def trigger_friend_push_task_handle(self,trigger_friend_push_task: TriggerFriendPushTaskMessage):
  385. logger.info("收到好友推送任务: %s", trigger_friend_push_task)
  386. friend_push_notice_message = FriendPushNoticeMessage()
  387. friend_push_notice_message.WeChatId = trigger_friend_push_task.WeChatId
  388. friend_push_notice_message.TaskId = trigger_friend_push_task.TaskId
  389. for wxid,contact in self.robot.contacts.items():
  390. if '@chatroom' in wxid or '@openim' in wxid or '@im.chatroom' in wxid or wxid.startswith("gh_"):
  391. logger.info("群组/企业微信/公众号 不推送: %s",wxid)
  392. continue
  393. if not contact.get('Type'):
  394. continue
  395. friend_message = FriendMessage()
  396. friend_message.FriendId = wxid
  397. friend_message.FriendNo = contact.get('code')
  398. friend_message.FriendNick = contact.get('name')
  399. if contact.get('gender') == "男":
  400. friend_message.Gender = EnumGender.Male
  401. elif contact.get('gender') == "女":
  402. friend_message.Gender = EnumGender.Female
  403. else:
  404. friend_message.Gender = EnumGender.UnknownGender
  405. friend_message.Avatar = contact.get('avatar','')
  406. friend_message.Country = contact.get('country')
  407. friend_message.Province = contact.get('province')
  408. friend_message.City = contact.get('city')
  409. friend_message.Remark = contact.get('remark')
  410. friend_message.Memo = contact.get('remark')
  411. friend_message.Type = contact.get('Type')
  412. friend_push_notice_message.Friends.append(friend_message)
  413. friend_push_notice_message.Count = len(friend_push_notice_message.Friends)
  414. friend_push_notice_message.Size = len(friend_push_notice_message.Friends)
  415. # 构造发送数据
  416. transport_message = TransportMessage()
  417. transport_message.MsgType = EnumMsgType.FriendPushNotice
  418. transport_message.Content.Pack(friend_push_notice_message)
  419. transport_message.AccessToken = self.accessToken
  420. self.send(transport_message)
  421. def send(self,transport_message: TransportMessage):
  422. # 序列化发送数据,并按网络协议要求打包
  423. serialized_data = transport_message.SerializeToString()
  424. message_length = len(serialized_data)
  425. message = message_length.to_bytes(4, byteorder='big') + serialized_data
  426. # 发送请求
  427. self.client_socket.sendall(message)