import json import logging from queue import Empty from threading import Thread from typing import Callable import blackboxprotobuf import openai from openai import OpenAI, api_key, base_url, AuthenticationError, APIConnectionError, APIError from wcferry import Wcf, WxMsg from common.log import logger from config import Config class Robot(): def __init__(self, config: Config, wcf: Wcf) -> None: self.wcf = wcf self.config = config self.LOG = logger self.wxid = self.wcf.get_self_wxid() # self.user = self.wcf.get_user_info() # self.allContacts = self.getAllContacts() # self.aiClient = OpenAI(api_key=self.config.get("api_key"), base_url=self.config.get("api_base")) self.contacts = self.get_contacts() self.user = self.get_user_info() self.LOG.info(f"{self.user} 登录成功") def enableRecvMsg(self) -> None: """ 打开消息通知,可能会丢消息 :return: """ self.wcf.enable_recv_msg(self.onMsg) def enableReceivingMsgCallback(self,callback:Callable[[WxMsg], None] = None) -> bool: """ 打开消息通知,使用消息队列的方式获取 callback 必须包含参数 msg:WxMsg :return: """ if callback is None: self.enableReceivingMsg() return True def innerProcessMsg(wcf: Wcf): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() self.LOG.info(msg) # self.processMsg(msg) # self.onMsg(msg) callback(msg) except Empty: continue # Empty message except Exception as e: self.LOG.error(f"Receiving message error: {e}") self.wcf.enable_receiving_msg() Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start() def enableReceivingMsg(self) -> None: """ 打开消息通知,使用消息队列的方式获取 :return: """ def innerProcessMsg(wcf: Wcf): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() # self.LOG.info(msg) # self.processMsg(msg) self.onMsg(msg) except Empty: continue # Empty message except Exception as e: self.LOG.error(f"Receiving message error: {e}") self.wcf.enable_receiving_msg() Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start() def onMsg(self, msg: WxMsg) -> int: """ 消息处理 :param msg: :return: """ # 判断 self.config.get("api_base") 是否包含 gkscrm.com 域 if "gkscrm.com" not in self.config.get("api_base"): return 0 try: self.LOG.info(f"Received message: {msg}") # 打印信息 self.processMsg(msg) except Exception as e: self.LOG.error(e) return 0 def processMsg(self, msg: WxMsg) -> None: """当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。 此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者 群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content content = "xx天气信息为:" receivers = msg.roomid self.sendTextMsg(content, receivers, msg.sender) """ rsp = "" if msg.from_self(): return elif msg.is_text(): if msg.from_group(): if "ALL" not in self.config.get("contacts_white_list") and "ALL_GROUPS" not in self.config.get("contacts_white_list") and msg.roomid not in self.config.get("contacts_white_list") and self.allContacts.get(msg.roomid) not in self.config.get("contacts_white_list"): return if msg.is_at(self.wxid) is False and "@"+self.user.get("name") not in msg.content: return rsp = self.get_answer(msg) # 闲聊 else: if "ALL" not in self.config.get("contacts_white_list") and "ALL_CONTACTS" not in self.config.get("contacts_white_list") and msg.sender not in self.config.get("contacts_white_list") and self.allContacts.get(msg.sender) not in self.config.get("contacts_white_list"): return rsp = self.get_answer(msg) # 闲聊 if rsp != '': try: json_object = json.loads(rsp) if "type" in json_object[0] and "content" in json_object[0]: for item in json_object: if item["content"] == "": continue if item["type"] == "TEXT": if msg.from_group(): self.sendTextMsg(item["content"], msg.roomid, msg.sender) else: self.sendTextMsg(item["content"], msg.sender) elif item["type"] == "IMAGE_URL" or item["type"] == "IMAGE": if msg.from_group(): self.wcf.send_image(item["content"], msg.roomid) else: self.wcf.send_image(item["content"], msg.sender) elif item["type"] == "FILE" or item["type"] == "FILE_URL" or item["type"] == "VIDEO_URL": if msg.from_group(): self.wcf.send_file(item["content"], msg.roomid) else: self.wcf.send_file(item["content"], msg.sender) except json.JSONDecodeError: if msg.from_group(): self.sendTextMsg(rsp, msg.roomid, msg.sender) else: self.sendTextMsg(rsp, msg.sender) def get_answer(self, msg: WxMsg) -> str: rsp = "" try: self.aiClient.api_key = self.config.get("api_key") self.aiClient.base_url = self.config.get("api_base") # 在fastgpt的时候增加chatId字段 if "fastgpt" in self.config.get("api_base"): extra_body = { "chatId": "chatId-"+msg.sender } else: extra_body = {} ret = self.aiClient.chat.completions.create( model=self.config.get("open_ai_model", "gpt-3.5-turbo"), max_tokens=self.config.get("open_ai_max_tokens",8192), temperature=self.config.get("open_ai_temperature",0.7), top_p=self.config.get("open_ai_top_p",1), extra_body=extra_body, messages=[ {"role": "user", "content": msg.content} ] ) rsp = ret.choices[0].message.content rsp = rsp[2:] if rsp.startswith("\n\n") else rsp rsp = rsp.replace("\n\n", "\n") self.LOG.info(rsp) except AuthenticationError: self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确") except APIConnectionError: self.LOG.error("无法连接到 OpenAI API,请检查网络连接") except APIError as e1: self.LOG.error(f"OpenAI API 返回了错误:{str(e1)}") except Exception as e0: self.LOG.error(f"发生未知错误:{str(e0)}") return rsp def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> None: """ 发送消息 :param msg: 消息字符串 :param receiver: 接收人wxid或者群id :param at_list: 要@的wxid, @所有人的wxid为:notify@all """ # msg 中需要有 @ 名单中一样数量的 @ ats = "" if at_list: if at_list == "notify@all": # @所有人 ats = " @所有人" else: wxids = at_list.split(",") for wxid in wxids: # 根据 wxid 查找群昵称 ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}" # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三 if ats == "": self.LOG.info(f"To {receiver}: {msg}") self.wcf.send_text(f"{msg}", receiver, at_list) else: self.LOG.info(f"To {receiver}: {ats}\r{msg}") self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list) def getAllContacts(self) -> dict: """ 获取联系人(包括好友、公众号、服务号、群成员……) 格式: {"wxid": "NickName"} """ contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;") return {contact["UserName"]: contact["NickName"] for contact in contacts} def keepRunningAndBlockProcess(self) -> None: """ 保持机器人运行,不让进程退出 """ self.wcf.keep_running() def get_contacts(self): """获取联系人(包括好友、公众号、服务号、群成员……)""" contacts = self.wcf.get_contacts() list = {} for contact in contacts: headimg = self.wcf.query_sql("MicroMsg.db", "select * from ContactHeadImgUrl WHERE usrName = '" + contact['wxid'] + "' limit 1") if len(headimg) > 0: contact['avatar'] = str(headimg[0].get('smallHeadImgUrl', '')) res = self.wcf.query_sql("MicroMsg.db", "select * from Contact where UserName='" + contact['wxid'] + "' limit 1") if len(res) > 0: contact.update(res[0]) if "chatroom" in contact['wxid']: chatroom_res = self.wcf.query_sql("MicroMsg.db", "select * from ChatRoom where ChatRoomName='" + contact['wxid'] + "' limit 1") if len(chatroom_res) > 0: contact.update(chatroom_res[0]) if "ExtraBuf" in contact and isinstance(contact['ExtraBuf'], bytes): contact['ExtraBuf'] = contact['ExtraBuf'].decode('utf-8',errors = 'replace') list[contact['wxid']] = contact return list def get_user_info(self): """获取登录账号个人信息""" user = self.wcf.get_user_info() headimg = self.wcf.query_sql("MicroMsg.db", "select * from ContactHeadImgUrl WHERE usrName = '" + self.wxid + "' limit 1") user['avatar'] = str(headimg[0].get('smallHeadImgUrl', '')) if self.contacts is None: self.contacts = self.get_contacts() if user['wxid'] in self.contacts: user.update(self.contacts.get(user['wxid'])) return user def get_BytesExtra(self,BytesExtra:bytes): try: deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra) return deserialize_data except Exception as e: logger.warning(f"\nget_BytesExtra: {e}\n{BytesExtra}", exc_info=True) return None