import json import logging import signal from queue import Empty from threading import Thread from typing import Optional import requests import openai from google.protobuf import json_format from openai import OpenAI, api_key, base_url, AuthenticationError, APIConnectionError, APIError from wcferry import Wcf, WxMsg, wcf_pb2 from common.log import logger from config import Config, conf from plugins.plugin import Plugin class Robot(): def __init__(self, config: Config, wcf: Wcf) -> None: self.wcf = wcf self.config = config self.LOG = logger self.wxid = self.wcf.get_self_wxid() self.user = self.wcf.get_user_info() self.allContacts = self.getAllContacts() self.plugins: list[Plugin] = [] # self.aiClient = OpenAI(api_key=self.config.get("openai_key"), base_url=self.config.get("openai_base")) self.LOG.info(f"{self.user} 登录成功") def register_plugin(self, plugin: Plugin): self.plugins.append(plugin) def enableRecvMsg(self) -> None: """ 打开消息通知,可能会丢消息 :return: """ self.wcf.enable_recv_msg(self.onMsg) def enableReceivingMsg(self) -> None: """ 打开消息通知,使用消息队列的方式获取 :return: """ def innerProcessMsg(wcf: Wcf): while wcf.is_receiving_msg(): try: msg = wcf.get_msg() # self.LOG.info(msg) # self.processMsg(msg) self.onMsg(msg) except Empty: continue # Empty message except Exception as e: self.LOG.error(f"Receiving message error: {e}") self.wcf.enable_receiving_msg() Thread(target=innerProcessMsg, name="GetMessage", args=(self.wcf,), daemon=True).start() def onMsg(self, msg: WxMsg) -> int: """ 消息处理 :param msg: :return: """ # 判断 self.config.get("api_base") 是否包含 gkscrm.com 域 if "gkscrm.com" not in self.config.get("api_base"): return 0 try: self.LOG.info(f"Received message: {msg}") # 打印信息 self.processMsg(msg) except Exception as e: self.LOG.error(e) return 0 def processMsg(self, msg: WxMsg) -> None: """当接收到消息的时候,会调用本方法。如果不实现本方法,则打印原始消息。 此处可进行自定义发送的内容,如通过 msg.content 关键字自动获取当前天气信息,并发送到对应的群组@发送者 群号:msg.roomid 微信ID:msg.sender 消息内容:msg.content content = "xx天气信息为:" receivers = msg.roomid self.sendTextMsg(content, receivers, msg.sender) """ rsp = "" if msg.from_self(): return elif msg.is_text(): if msg.from_group(): if "ALL" not in self.config.get("contacts_white_list") and "ALL_GROUPS" not in self.config.get( "contacts_white_list") and msg.roomid not in self.config.get( "contacts_white_list") and self.allContacts.get(msg.roomid) not in self.config.get( "contacts_white_list"): return if msg.is_at(self.wxid) is False and "@" + self.user.get("name") not in msg.content: return rsp = self.get_answer(msg) # 闲聊 else: if "ALL" not in self.config.get("contacts_white_list") and "ALL_CONTACTS" not in self.config.get( "contacts_white_list") and msg.sender not in self.config.get( "contacts_white_list") and self.allContacts.get(msg.sender) not in self.config.get( "contacts_white_list"): return rsp = self.get_answer(msg) # 闲聊 if rsp != '': try: json_object = json.loads(rsp) if "type" in json_object[0] and "content" in json_object[0]: for item in json_object: if item["content"] == "": continue if item["type"] == "TEXT": if msg.from_group(): self.sendTextMsg(item["content"], msg.roomid, msg.sender) else: self.sendTextMsg(item["content"], msg.sender) elif item["type"] == "IMAGE_URL" or item["type"] == "IMAGE": if msg.from_group(): self.wcf.send_image(item["content"], msg.roomid) else: self.wcf.send_image(item["content"], msg.sender) elif item["type"] == "FILE" or item["type"] == "FILE_URL" or item["type"] == "VIDEO_URL": if msg.from_group(): self.wcf.send_file(item["content"], msg.roomid) else: self.wcf.send_file(item["content"], msg.sender) except json.JSONDecodeError: if msg.from_group(): self.sendTextMsg(rsp, msg.roomid, msg.sender) else: self.sendTextMsg(rsp, msg.sender) def get_answer(self, msg: WxMsg) -> str: rsp = "" for plugin in self.plugins: rsp = plugin.answer(msg, self.wxid) if rsp != "": break return rsp # try: # # self.aiClient.api_key = self.config.get("api_key") # # self.aiClient.base_url = self.config.get("api_base") # # # 在fastgpt的时候增加chatId字段 # if "fastgpt" in self.config.get("api_base"): # extra_body = { # "chatId": "chatId-" + msg.sender + "-" + msg.roomid # } # else: # extra_body = {} # # ret = self.aiClient.chat.completions.create( # model=self.config.get("open_ai_model", "gpt-3.5-turbo"), # max_tokens=self.config.get("open_ai_max_tokens", 8192), # temperature=self.config.get("open_ai_temperature", 0.7), # top_p=self.config.get("open_ai_top_p", 1), # extra_body=extra_body, # messages=[ # {"role": "user", "content": msg.content} # ] # ) # rsp = ret.choices[0].message.content # rsp = rsp[2:] if rsp.startswith("\n\n") else rsp # rsp = rsp.replace("\n\n", "\n") # self.LOG.info(rsp) # except AuthenticationError: # self.LOG.error("OpenAI API 认证失败,请检查 API 密钥是否正确") # except APIConnectionError: # self.LOG.error("无法连接到 OpenAI API,请检查网络连接") # except APIError as e1: # self.LOG.error(f"OpenAI API 返回了错误:{str(e1)}") # except Exception as e0: # self.LOG.error(f"发生未知错误:{str(e0)}") # return rsp def sendTextMsg(self, msg: str, receiver: str, at_list: str = "") -> int: """ 发送消息 :param msg: 消息字符串 :param receiver: 接收人wxid或者群id :param at_list: 要@的wxid, @所有人的wxid为:notify@all """ # msg 中需要有 @ 名单中一样数量的 @ ats = "" if at_list: if at_list == "notify@all": # @所有人 ats = " @所有人" else: wxids = at_list.split(",") for wxid in wxids: # 根据 wxid 查找群昵称 ats += f" @{self.wcf.get_alias_in_chatroom(wxid, receiver)}" # {msg}{ats} 表示要发送的消息内容后面紧跟@,例如 北京天气情况为:xxx @张三 if ats == "": self.LOG.info(f"To {receiver}: {msg}") result = self.wcf.send_text(f"{msg}", receiver, at_list) return result else: self.LOG.info(f"To {receiver}: {ats}\r{msg}") result = self.wcf.send_text(f"{ats}\n\n{msg}", receiver, at_list) return result def getAllContacts(self) -> dict: """ 获取联系人(包括好友、公众号、服务号、群成员……) 格式: {"wxid": "NickName"} """ contacts = self.wcf.query_sql("MicroMsg.db", "SELECT UserName, NickName FROM Contact;") return {contact["UserName"]: contact["NickName"] for contact in contacts} def keepRunningAndBlockProcess(self) -> None: """ 保持机器人运行,不让进程退出 """ self.wcf.keep_running() def getFriendOrChatRoomList(self, type: str, label_id: int) -> list: """ 获取好友列表或者群列表 :param type: 1: 联系人 2: 群组 :param label_id: :return: 好友列表或者群列表 """ contacts_list = [] contacts = self.wcf.query_sql("MicroMsg.db", "select UserName, NickName, LabelIDList, Type from Contact") not_contacts = { "fmessage": "朋友推荐消息", "medianote": "语音记事本", "floatbottle": "漂流瓶", "filehelper": "文件传输助手", "newsapp": "新闻", "mphelper": "公众平台安全助手", } for cnt in contacts: if ((type == "联系人" and (cnt["Type"] != 3 or (label_id != 0 and not is_int_in_string(label_id, cnt["LabelIDList"])))) or # 群组 (type == "群组" and (cnt["Type"] != 2 or not cnt["UserName"].endswith("@chatroom"))) or # 联系人 cnt["UserName"].startswith("gh_") or # 公众号 cnt["UserName"] in not_contacts.keys() # 其他杂号 ): continue contact = { "wxid": cnt.get("UserName", ""), "name": cnt.get("NickName", "")} contacts_list.append(contact) return contacts_list def getContactLabelList(self) -> list: """ 查询数据库 :param sql: 查询语句 :param params: 参数 :return: 查询结果 """ return self.wcf.query_sql("MicroMsg.db", "select * from ContactLabel") robot: Optional[Robot] = None def init_robot(): global robot wcf = Wcf(debug=conf().get("debug", False)) def handler(sig, frame): wcf.cleanup() # 退出前清理环境 exit(0) signal.signal(signal.SIGINT, handler) robot = Robot(conf(), wcf) def get_robot(): return robot def is_int_in_string(int_value, str_values): # 拆分字符串并去除多余空格 split_values = map(str.strip, str_values.split(',')) # 过滤掉空字符串,转换为整数列表 values_list = list(map(int, filter(lambda x: x != '', split_values))) # 检查整数是否在列表中 return int_value in values_list