123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322 |
- import json
- import logging
- import signal
- from queue import Empty
- from threading import Thread
- from typing import Optional
- import requests
- import openai
- from google.protobuf import json_format
- from openai import OpenAI, api_key, base_url, AuthenticationError, APIConnectionError, APIError
- from wcferry import Wcf, WxMsg, wcf_pb2
- from common.log import logger
- from config import Config, conf
- from plugins.plugin import Plugin
- class Robot():
- def __init__(self, config: Config, wcf: Wcf) -> None:
- self.wcf = wcf
- self.config = config
- self.LOG = logger
- self.wxid = self.wcf.get_self_wxid()
- self.user = self.wcf.get_user_info()
- self.allContacts = self.getAllContacts()
- self.plugins: list[Plugin] = []
- # self.aiClient = OpenAI(api_key=self.config.get("openai_key"), base_url=self.config.get("openai_base"))
- self.LOG.info(f"{self.user} 登录成功")
- def register_plugin(self, plugin: Plugin):
- self.plugins.append(plugin)
- def enableRecvMsg(self) -> None:
- """
- 打开消息通知,可能会丢消息
- :return:
- """
- self.wcf.enable_recv_msg(self.onMsg)
- def enableReceivingMsg(self) -> None:
- """
- 打开消息通知,使用消息队列的方式获取
- :return:
- """
- def innerProcessMsg(wcf: Wcf):
- while wcf.is_receiving_msg():
- try:
- msg = wcf.get_msg()
- # self.LOG.info(msg)
- # self.processMsg(msg)
- self.onMsg(msg)
- except Empty:
- continue # Empty message
- except Exception as e:
- self.LOG.error(f"Receiving message error: {e}")
- self.wcf.enable_receiving_msg()
- Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start()
- def onMsg(self, msg: WxMsg) -> int:
- """
- 消息处理
- :param msg:
- :return:
- """
- # 判断 self.config.get("api_base") 是否包含 gkscrm.com 域
- if "gkscrm.com" not in self.config.get("api_base"):
- return 0
- try:
- self.LOG.info(f"Received message: {msg}") # 打印信息
- self.processMsg(msg)
- except Exception as e:
- print(e)
- # self.LOG.error(f"{e}")
- return 0
- def processMsg(self, msg: WxMsg) -> None:
- """当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。
- 此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者
- 群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content
- content = "xx天气信息为:"
- receivers = msg.roomid
- self.sendTextMsg(content, receivers, msg.sender)
- """
- # self.getAllTables()
- # print(self.getContactList())
- # print(self.getUserInfoByWxid('wxid_k7p37xnoig8y21'))
- rsp = ""
- if msg.from_self():
- return
- elif msg.is_text():
- if msg.from_group():
- if "ALL" not in self.config.get("contacts_white_list") and "ALL_GROUPS" not in self.config.get(
- "contacts_white_list") and msg.roomid not in self.config.get(
- "contacts_white_list") and self.allContacts.get(msg.roomid) not in self.config.get(
- "contacts_white_list"):
- return
- if msg.is_at(self.wxid) is False and "@" + self.user.get("name") not in msg.content:
- return
- rsp = self.get_answer(msg) # 闲聊
- else:
- if "ALL" not in self.config.get("contacts_white_list") and "ALL_CONTACTS" not in self.config.get(
- "contacts_white_list") and msg.sender not in self.config.get(
- "contacts_white_list") and self.allContacts.get(msg.sender) not in self.config.get(
- "contacts_white_list"):
- return
- rsp = self.get_answer(msg) # 闲聊
- if rsp != '':
- try:
- json_object = json.loads(rsp)
- if "type" in json_object[0] and "content" in json_object[0]:
- for item in json_object:
- if item["content"] == "":
- continue
- if item["type"] == "TEXT":
- if msg.from_group():
- self.sendTextMsg(item["content"], msg.roomid, msg.sender)
- else:
- self.sendTextMsg(item["content"], msg.sender)
- elif item["type"] == "IMAGE_URL" or item["type"] == "IMAGE":
- if msg.from_group():
- self.wcf.send_image(item["content"], msg.roomid)
- else:
- self.wcf.send_image(item["content"], msg.sender)
- elif item["type"] == "FILE" or item["type"] == "FILE_URL" or item["type"] == "VIDEO_URL":
- if msg.from_group():
- self.wcf.send_file(item["content"], msg.roomid)
- else:
- self.wcf.send_file(item["content"], msg.sender)
- except json.JSONDecodeError:
- if msg.from_group():
- self.sendTextMsg(rsp, msg.roomid, msg.sender)
- else:
- self.sendTextMsg(rsp, msg.sender)
- def get_answer(self, msg: WxMsg) -> str:
- rsp = ""
- for plugin in self.plugins:
- rsp = plugin.answer(msg, self.wxid)
- if rsp != "":
- break
- return rsp
- # try:
- # # self.aiClient.api_key = self.config.get("api_key")
- # # self.aiClient.base_url = self.config.get("api_base")
- #
- # # 在fastgpt的时候增加chatId字段
- # if "fastgpt" in self.config.get("api_base"):
- # extra_body = {
- # "chatId": "chatId-" + msg.sender + "-" + msg.roomid
- # }
- # else:
- # extra_body = {}
- #
- # ret = self.aiClient.chat.completions.create(
- # model=self.config.get("open_ai_model", "gpt-3.5-turbo"),
- # max_tokens=self.config.get("open_ai_max_tokens", 8192),
- # temperature=self.config.get("open_ai_temperature", 0.7),
- # top_p=self.config.get("open_ai_top_p", 1),
- # extra_body=extra_body,
- # messages=[
- # {"role": "user", "content": msg.content}
- # ]
- # )
- # rsp = ret.choices[0].message.content
- # rsp = rsp[2:] if rsp.startswith("\n\n") else rsp
- # rsp = rsp.replace("\n\n", "\n")
- # self.LOG.info(rsp)
- # except AuthenticationError:
- # self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确")
- # except APIConnectionError:
- # self.LOG.error("无法连接到 OpenAI API,请检查网络连接")
- # except APIError as e1:
- # self.LOG.error(f"OpenAI API 返回了错误:{str(e1)}")
- # except Exception as e0:
- # self.LOG.error(f"发生未知错误:{str(e0)}")
- # return rsp
- def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> int:
- """ 发送消息
- :param msg: 消息字符串
- :param receiver: 接收人wxid或者群id
- :param at_list: 要@的wxid, @所有人的wxid为:notify@all
- """
- # msg 中需要有 @ 名单中一样数量的 @
- ats = ""
- if at_list:
- if at_list == "notify@all": # @所有人
- ats = " @所有人"
- else:
- wxids = at_list.split(",")
- for wxid in wxids:
- # 根据 wxid 查找群昵称
- ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}"
- # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三
- if ats == "":
- self.LOG.info(f"To {receiver}: {msg}")
- result = self.wcf.send_text(f"{msg}", receiver, at_list)
- return result
- else:
- self.LOG.info(f"To {receiver}: {ats}\r{msg}")
- result = self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list)
- return result
- def sendFileMsg(self, file: str, receiver: str) -> int:
- self.LOG.info(f"To {receiver}: {file}")
- result = self.wcf.send_file(file, receiver)
- return result
- def getAllContacts(self) -> dict:
- """
- 获取联系人(包括好友、公众号、服务号、群成员……)
- 格式: {"wxid": "NickName"}
- """
- contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;")
- return {contact["UserName"]: contact["NickName"] for contact in contacts}
- def keepRunningAndBlockProcess(self) -> None:
- """
- 保持机器人运行,不让进程退出
- """
- self.wcf.keep_running()
- def getFriendOrChatRoomList(self, type: str, label_id: int) -> list:
- """
- 获取好友列表或者群列表
- :param type: 1: 联系人 2: 群组
- :param label_id:
- :return: 好友列表或者群列表
- """
- contacts_list = []
- contacts = self.wcf.query_sql("MicroMsg.db", "select UserName, NickName, LabelIDList, Type from Contact")
- not_contacts = {
- "fmessage": "朋友推荐消息",
- "medianote": "语音记事本",
- "floatbottle": "漂流瓶",
- "filehelper": "文件传输助手",
- "newsapp": "新闻",
- "mphelper": "公众平台安全助手",
- }
- for cnt in contacts:
- if ((type == "联系人" and (cnt["Type"] != 3 or (label_id != 0 and not is_int_in_string(label_id, cnt["LabelIDList"])))) or # 群组
- (type == "群组" and (cnt["Type"] != 2 or not cnt["UserName"].endswith("@chatroom"))) or # 联系人
- cnt["UserName"].startswith("gh_") or # 公众号
- cnt["UserName"] in not_contacts.keys() # 其他杂号
- ):
- continue
- contact = {
- "wxid": cnt.get("UserName", ""),
- "name": cnt.get("NickName", "")}
- contacts_list.append(contact)
- return contacts_list
- def getContactLabelList(self) -> list:
- """
- 查询数据库
- :param sql: 查询语句
- :param params: 参数
- :return: 查询结果
- """
- return self.wcf.query_sql("MicroMsg.db", "select * from ContactLabel")
- def getAllTables(self):
- self.LOG.info(f"getAllTables")
- dbs = self.wcf.get_dbs()
- self.LOG.info(f"getAllDBs = {dbs}")
- for db in dbs:
- tables = self.wcf.get_tables(db)
- self.LOG.info(f"db={db}, getAllTables = {tables}")
- for table in tables:
- self.LOG.info(f"table={table}")
- def getContacts(self):
- return self.wcf.get_contacts()
- def getUserInfo(self):
- return self.wcf.get_user_info()
- def getUserInfoByWxid(self, wx_id):
- return self.wcf.get_info_by_wxid(wx_id)
- def getContactList(self):
- # return self.wcf.get_info_by_wxid(wx_id)
- return self.wcf.query_sql("MicroMsg.db", "select UserName, NickName, LabelIDList, Type from Contact")
- def insertLabel(self, label_name: str):
- return self.wcf.query_sql("MicroMsg.db", f"insert into ContactLabel(LabelName) values({label_name})")
- robot: Optional[Robot] = None
- def init_robot():
- global robot
- wcf = Wcf(debug=conf().get("debug", False))
- def handler(sig, frame):
- wcf.cleanup() # 退出前清理环境
- exit(0)
- signal.signal(signal.SIGINT, handler)
- robot = Robot(conf(), wcf)
- def get_robot():
- return robot
- def is_int_in_string(int_value, str_values):
- # 拆分字符串并去除多余空格
- split_values = map(str.strip, str_values.split(','))
- # 过滤掉空字符串,转换为整数列表
- values_list = list(map(int, filter(lambda x: x != '', split_values)))
- # 检查整数是否在列表中
- return int_value in values_list
|