123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import json
- import logging
- from queue import Empty
- from threading import Thread
- from typing import Callable
- import blackboxprotobuf
- import openai
- from openai import OpenAI, api_key, base_url, AuthenticationError, APIConnectionError, APIError
- from wcferry import Wcf, WxMsg
- from common.log import logger
- from config import Config
- class Robot():
- def __init__(self, config: Config, wcf: Wcf) -> None:
- self.wcf = wcf
- self.config = config
- self.LOG = logger
- self.wxid = self.wcf.get_self_wxid()
- # self.user = self.wcf.get_user_info()
- # self.allContacts = self.getAllContacts()
- # self.aiClient = OpenAI(api_key=self.config.get("api_key"), base_url=self.config.get("api_base"))
- self.contacts = self.get_contacts()
- self.user = self.get_user_info()
- self.LOG.info(f"{self.user} 登录成功")
- def enableRecvMsg(self) -> None:
- """
- 打开消息通知,可能会丢消息
- :return:
- """
- self.wcf.enable_recv_msg(self.onMsg)
- def enableReceivingMsgCallback(self,callback:Callable[[WxMsg], None] = None) -> bool:
- """
- 打开消息通知,使用消息队列的方式获取
- callback 必须包含参数 msg:WxMsg
- :return:
- """
- if callback is None:
- self.enableReceivingMsg()
- return True
- def innerProcessMsg(wcf: Wcf):
- while wcf.is_receiving_msg():
- try:
- msg = wcf.get_msg()
- self.LOG.info(msg)
- # self.processMsg(msg)
- # self.onMsg(msg)
- callback(msg)
- except Empty:
- continue # Empty message
- except Exception as e:
- self.LOG.error(f"Receiving message error: {e}")
- self.wcf.enable_receiving_msg()
- Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start()
- def enableReceivingMsg(self) -> None:
- """
- 打开消息通知,使用消息队列的方式获取
- :return:
- """
- def innerProcessMsg(wcf: Wcf):
- while wcf.is_receiving_msg():
- try:
- msg = wcf.get_msg()
- # self.LOG.info(msg)
- # self.processMsg(msg)
- self.onMsg(msg)
- except Empty:
- continue # Empty message
- except Exception as e:
- self.LOG.error(f"Receiving message error: {e}")
- self.wcf.enable_receiving_msg()
- Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start()
- def onMsg(self, msg: WxMsg) -> int:
- """
- 消息处理
- :param msg:
- :return:
- """
- # 判断 self.config.get("api_base") 是否包含 gkscrm.com 域
- if "gkscrm.com" not in self.config.get("api_base"):
- return 0
- try:
- self.LOG.info(f"Received message: {msg}") # 打印信息
- self.processMsg(msg)
- except Exception as e:
- self.LOG.error(e)
- return 0
- def processMsg(self, msg: WxMsg) -> None:
- """当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。
- 此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者
- 群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content
- content = "xx天气信息为:"
- receivers = msg.roomid
- self.sendTextMsg(content, receivers, msg.sender)
- """
- rsp = ""
- if msg.from_self():
- return
- elif msg.is_text():
- if msg.from_group():
- if "ALL" not in self.config.get("contacts_white_list") and "ALL_GROUPS" not in self.config.get("contacts_white_list") and msg.roomid not in self.config.get("contacts_white_list") and self.allContacts.get(msg.roomid) not in self.config.get("contacts_white_list"):
- return
- if msg.is_at(self.wxid) is False and "@"+self.user.get("name") not in msg.content:
- return
- rsp = self.get_answer(msg) # 闲聊
- else:
- if "ALL" not in self.config.get("contacts_white_list") and "ALL_CONTACTS" not in self.config.get("contacts_white_list") and msg.sender not in self.config.get("contacts_white_list") and self.allContacts.get(msg.sender) not in self.config.get("contacts_white_list"):
- return
- rsp = self.get_answer(msg) # 闲聊
- if rsp != '':
- try:
- json_object = json.loads(rsp)
- if "type" in json_object[0] and "content" in json_object[0]:
- for item in json_object:
- if item["content"] == "":
- continue
- if item["type"] == "TEXT":
- if msg.from_group():
- self.sendTextMsg(item["content"], msg.roomid, msg.sender)
- else:
- self.sendTextMsg(item["content"], msg.sender)
- elif item["type"] == "IMAGE_URL" or item["type"] == "IMAGE":
- if msg.from_group():
- self.wcf.send_image(item["content"], msg.roomid)
- else:
- self.wcf.send_image(item["content"], msg.sender)
- elif item["type"] == "FILE" or item["type"] == "FILE_URL" or item["type"] == "VIDEO_URL":
- if msg.from_group():
- self.wcf.send_file(item["content"], msg.roomid)
- else:
- self.wcf.send_file(item["content"], msg.sender)
- except json.JSONDecodeError:
- if msg.from_group():
- self.sendTextMsg(rsp, msg.roomid, msg.sender)
- else:
- self.sendTextMsg(rsp, msg.sender)
- def get_answer(self, msg: WxMsg) -> str:
- rsp = ""
- try:
- self.aiClient.api_key = self.config.get("api_key")
- self.aiClient.base_url = self.config.get("api_base")
- # 在fastgpt的时候增加chatId字段
- if "fastgpt" in self.config.get("api_base"):
- extra_body = {
- "chatId": "chatId-"+msg.sender
- }
- else:
- extra_body = {}
- ret = self.aiClient.chat.completions.create(
- model=self.config.get("open_ai_model", "gpt-3.5-turbo"),
- max_tokens=self.config.get("open_ai_max_tokens",8192),
- temperature=self.config.get("open_ai_temperature",0.7),
- top_p=self.config.get("open_ai_top_p",1),
- extra_body=extra_body,
- messages=[
- {"role": "user", "content": msg.content}
- ]
- )
- rsp = ret.choices[0].message.content
- rsp = rsp[2:] if rsp.startswith("\n\n") else rsp
- rsp = rsp.replace("\n\n", "\n")
- self.LOG.info(rsp)
- except AuthenticationError:
- self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确")
- except APIConnectionError:
- self.LOG.error("无法连接到 OpenAI API,请检查网络连接")
- except APIError as e1:
- self.LOG.error(f"OpenAI API 返回了错误:{str(e1)}")
- except Exception as e0:
- self.LOG.error(f"发生未知错误:{str(e0)}")
- return rsp
- def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> None:
- """ 发送消息
- :param msg: 消息字符串
- :param receiver: 接收人wxid或者群id
- :param at_list: 要@的wxid, @所有人的wxid为:notify@all
- """
- # msg 中需要有 @ 名单中一样数量的 @
- ats = ""
- if at_list:
- if at_list == "notify@all": # @所有人
- ats = " @所有人"
- else:
- wxids = at_list.split(",")
- for wxid in wxids:
- # 根据 wxid 查找群昵称
- ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}"
- # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三
- if ats == "":
- self.LOG.info(f"To {receiver}: {msg}")
- self.wcf.send_text(f"{msg}", receiver, at_list)
- else:
- self.LOG.info(f"To {receiver}: {ats}\r{msg}")
- self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list)
- def getAllContacts(self) -> dict:
- """
- 获取联系人(包括好友、公众号、服务号、群成员……)
- 格式: {"wxid": "NickName"}
- """
- contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;")
- return {contact["UserName"]: contact["NickName"] for contact in contacts}
- def keepRunningAndBlockProcess(self) -> None:
- """
- 保持机器人运行,不让进程退出
- """
- self.wcf.keep_running()
- def get_contacts(self):
- """获取联系人(包括好友、公众号、服务号、群成员……)"""
- contacts = self.wcf.get_contacts()
- list = {}
- for contact in contacts:
- headimg = self.wcf.query_sql("MicroMsg.db",
- "select * from ContactHeadImgUrl WHERE usrName = '" + contact['wxid'] + "' limit 1")
- if len(headimg) > 0:
- contact['avatar'] = str(headimg[0].get('smallHeadImgUrl', ''))
- res = self.wcf.query_sql("MicroMsg.db", "select * from Contact where UserName='" + contact['wxid'] + "' limit 1")
- if len(res) > 0:
- contact.update(res[0])
- if "chatroom" in contact['wxid']:
- chatroom_res = self.wcf.query_sql("MicroMsg.db",
- "select * from ChatRoom where ChatRoomName='" + contact['wxid'] + "' limit 1")
- if len(chatroom_res) > 0:
- contact.update(chatroom_res[0])
- if "ExtraBuf" in contact and isinstance(contact['ExtraBuf'], bytes):
- contact['ExtraBuf'] = contact['ExtraBuf'].decode('utf-8',errors = 'replace')
- list[contact['wxid']] = contact
- return list
- def get_user_info(self):
- """获取登录账号个人信息"""
- user = self.wcf.get_user_info()
- headimg = self.wcf.query_sql("MicroMsg.db",
- "select * from ContactHeadImgUrl WHERE usrName = '" + self.wxid + "' limit 1")
- user['avatar'] = str(headimg[0].get('smallHeadImgUrl', ''))
- if self.contacts is None:
- self.contacts = self.get_contacts()
- if user['wxid'] in self.contacts:
- user.update(self.contacts.get(user['wxid']))
- return user
- def get_BytesExtra(self,BytesExtra:bytes):
- try:
- deserialize_data, message_type = blackboxprotobuf.decode_message(BytesExtra)
- return deserialize_data
- except Exception as e:
- logger.warning(f"\nget_BytesExtra: {e}\n{BytesExtra}", exc_info=True)
- return None
|