from typing import Optional import json import random import re import time from wcferry import WxMsg from common.log import logger from common.sql_lite import init_new_db_connection from enum import Enum from db.message_records import get_next_answer, create_message_record, check_message_record from db.contact import get_contact_by_wxid, add_contact_label from db.sop import get_stage from .plugin import Plugin from service.robot import get_robot class Sop(Plugin): def __init__(self): super().__init__() # 根据 sop 的设置返回回答内容 def answer(self, msg: WxMsg, wx_wxid: Optional[str] = None): connection = init_new_db_connection() cursor = connection.cursor() robot = get_robot() bot_wxid = wx_wxid # 登陆微信人的wx_id contact_wxid = msg.sender # 发信息的人的wx_id receiver = wx_wxid # 接收消息的人 contact_type = 2 if msg.from_group() else 1 # 1-人与机器人聊天 2-群内人与机器人聊天 content = msg.content backup_node_id = -1 # 备份节点id need_judge = False # 是否需要判断询问大模型 try: """ 这里需要取出 执行状态的sop_task任务 然后根据他的sop_stage和sop_node内容判断是否 满足某个sop_node 要求,最后根据设置的 sop 来返回具体的内容 """ message_record, sop_nodes = get_next_answer(cursor, bot_wxid, contact_wxid, contact_type) if message_record and sop_nodes: prompt = f"""# 任务 请根据历史消息,判断用户回复的内容或深层意图,与哪个节点的意图相匹配。 # 历史消息: 助手发送:{message_record["content"]} 用户回复:{content} # 节点列表:""" organization_id = message_record["organization_id"] for index, sop_node in enumerate(sop_nodes): # condition_list = json.loads(sop_node['condition_list']) """ 代表任意内容,或者不回复,此时不把node节点内容塞给大模型 """ if sop_node['condition_list'] != '[""]': need_judge = True prompt += f""" 节点 id: {index} 节点意图:{sop_node['condition_list']} """ else: if sop_node['no_reply_condition'] == 0: backup_node_id = index # prompt += f""" # 节点 id: {index} # 命中条件:用户发送任意内容 # """ prompt += f""" # 回复要求 - 如果命中节点:则仅回复节点 id 数字(如命中多个节点,则仅回复最小值) - 如果未命中节点:则仅回复一个单词: None""" if need_judge: reply = self.reply(prompt, contact_wxid) else: reply = Reply() reply.content = "None" # logger.debug("[wxsop] reply: ?" % reply) if reply.content == "None" and backup_node_id != -1: reply.content = str(backup_node_id) if reply.content != "None": # 将 reply.content 从 str 转换为 int node_order = int(reply.content) # sop_nodes[node_order]['action_message']的值为 json str ,将其转换为字典 messages = [] forwards = [] haveVar = False # 命中后发送的消息内容 if sop_nodes[node_order]['action_message'] is not None: action_messages = json.loads(sop_nodes[node_order]['action_message']) for index, message in enumerate(action_messages): if message['content'] != "": type = int(message['type']) if type == 1: haveVar = self.contains_placeholder(message['content']) messages.append({ "type": 1, "message": { "wxid": receiver, "msg": message['content'] } }) else: messages.append({ "type": 2, "message": { "wxid": receiver, "filepath": message['content'], "diyfilename": message['meta']['filename'] } }) if len(messages) == 0: _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"], contact_type, contact_wxid, 1, "", {}, 4, sop_nodes[node_order]["id"], 0, organization_id) # 命中后转发到人工 if sop_nodes[node_order]['action_forward'] is not None: action_forward = json.loads(sop_nodes[node_order]['action_forward']) if action_forward['wxid'] != "": forward_wxids = self.split_string(action_forward['wxid']) for forward_wxid in forward_wxids: for index, message in enumerate(action_forward['action']): if message['content'] != "": stype = int(message['type']) if stype == 1: haveVar = self.contains_placeholder(message['content']) forwards.append({ "type": 1, "message": { "wxid": forward_wxid, "msg": message['content'] } }) else: forwards.append({ "type": 2, "message": { "wxid": forward_wxid, "filepath": message['content'], "diyfilename": message['meta']['filename'] } }) if haveVar: contactinfo = get_contact_by_wxid(receiver) else: contactinfo = {} for index, message in enumerate(messages): # 随机睡眠 time.sleep(random.uniform(2.0, 5.0)) if message["type"] == 1: if haveVar: message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo) robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"]) # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'], # wxinfo['port']) _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"], contact_type, contact_wxid, message["type"], message['message']['msg'], {}, 4, sop_nodes[node_order]["id"], index, organization_id) else: robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"]) # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'], # wxinfo['port']) _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"], contact_type, contact_wxid, message["type"], message['message']['filepath'], {"filename": message['message']['diyfilename']}, 4, sop_nodes[node_order]["id"], index, organization_id) for message in forwards: # 随机睡眠 time.sleep(random.uniform(2.0, 5.0)) if message["type"] == 1: if haveVar: message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo) robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"]) # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'], # wxinfo['port']) else: robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"]) # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'], # wxinfo['port']) action_label_add = json.loads(sop_nodes[node_order]['action_label_add']) action_label_del = json.loads(sop_nodes[node_order]['action_label_del']) action_label_add_list = json.loads(sop_nodes[node_order]['action_label_add_list']) action_label_del_list = json.loads(sop_nodes[node_order]['action_label_del_list']) if action_label_add or action_label_del: stages = get_stage(organization_id) self.add_tag(bot_wxid, message_record["contact_id"], contact_type, contact_wxid, action_label_add, action_label_del, action_label_add_list, action_label_del_list, stages, organization_id) connection.commit() return msg.content except Exception as e: # 回滚事务 connection.rollback() print(f"发生错误: {e}") finally: # 确保资源被正确释放 cursor.close() connection.close() # reply 询问大模型 def reply(self, query, prompt: str = None, sender: str = None): messages = [{"role": "user", "content": prompt}] reply_content = self._client_reply(self.openAiClient, messages, sender) if reply_content is not None: reply = Reply(ReplyType.TEXT, reply_content) return reply else: return Reply(ReplyType.ERROR, "None") def contains_placeholder(s: str) -> bool: pattern = r'\$\{.*?\}' return bool(re.search(pattern, s)) def var_replace(s: str, contactinfo: dict) -> str: s = s.replace("${nickname}", contactinfo["nickname"]) return s def split_string(input_string): # 定义正则表达式,匹配中文逗号、英文逗号和顿号 pattern = r'[,,、]' # 使用re.split方法分割字符串 result = re.split(pattern, input_string) return result # 为联系人添加标签 def add_tag(self, bot_wxid, contact_id, contact_type, contact_wxid, action_label_add, action_label_del, action_label_add_list, action_label_del_list, stages, organization_id): connection = init_new_db_connection() cursor = connection.cursor() robot = get_robot() contact_label_ids = add_contact_label(contact_id, action_label_add, action_label_del, action_label_add_list, action_label_del_list) match_stages = [] # logger.debug("[wxsop] contact_label_ids: ?" % contact_label_ids) # logger.debug("[wxsop] stages: ?" % stages) for stage in stages: if stage["condition_type"] == 1: # logger.debug("[wxsop] stage: ?" % stage) if self.check_filter(stage, contact_label_ids): match_stages.append(stage) for stage in match_stages: is_message_send = check_message_record(cursor, contact_wxid, 3, stage["id"], 0) if is_message_send: continue messages = [] forwards = [] haveVar = False if stage["action_message"]: action_message = json.loads(stage['action_message']) for index, message in enumerate(action_message): if message['content'] != "": type = int(message['type']) if type == 1: haveVar = self.contains_placeholder(message['content']) messages.append({ "type": 1, "message": { "wxid": contact_wxid, "msg": message['content'] } }) else: messages.append({ "type": 2, "message": { "wxid": contact_wxid, "filepath": message['content'], "diyfilename": message['meta']['filename'] } }) if len(messages) == 0: _ = create_message_record(cursor,3, bot_wxid, contact_id, contact_type, contact_wxid, 1, "", {}, 3, stage["id"], 0, organization_id) if stage["action_forward"]: action_forward = json.loads(stage['action_forward']) if action_forward['wxid'] != "": forward_wxids = self.split_string(action_forward['wxid']) for forward_wxid in forward_wxids: for index, message in enumerate(action_forward['action']): if message['content'] != "": stype = int(message['type']) if stype == 1: haveVar = self.contains_placeholder(message['content']) forwards.append({ "type": 1, "message": { "wxid": forward_wxid, "msg": message['content'] } }) # _ = wx_hook_request("/SendTextMsg", data, server['private_ip'], wxinfo['port']) else: forwards.append({ "type": 2, "message": { "wxid": forward_wxid, "filepath": message['content'], "diyfilename": message['meta']['filename'] } }) if haveVar: contactinfo = get_contact_by_wxid(contact_wxid) else: contactinfo = {} for index, message in enumerate(messages): # 随机睡眠 time.sleep(random.uniform(2.0, 5.0)) if message["type"] == 1: if haveVar: message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo) robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"]) # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'], # wxinfo['port']) _ = create_message_record(cursor, 3, bot_wxid, contact_id, contact_type, contact_wxid, message["type"], message['message']['msg'], {}, 3, stage["id"], index, organization_id) else: robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"]) # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'], # wxinfo['port']) # _ = create_message_record(cursor, 3, bot_wxid, contact_id, contact_type, contact_wxid, message["type"], message['message']['filepath'], {"filename": message['message'][ 'diyfilename']}, 3, stage["id"], index, organization_id) for message in forwards: # 随机睡眠 time.sleep(random.uniform(2.0, 5.0)) if message["type"] == 1: if haveVar: message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo) robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"]) # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'], # wxinfo['port']) else: robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"]) # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'], # wxinfo['port']) if stage["action_label_add"] or stage["action_label_del"]: add_labels = json.loads(stage['action_label_add_list']) rem_labels = json.loads(stage['action_label_del_list']) self.add_tag(bot_wxid, contact_id, contact_type, contact_wxid, add_labels, rem_labels, stages, organization_id) def check_filter(filter, contact_label_ids): condition_operator = filter['condition_operator'] condition_list = json.loads(filter['condition_list']) if condition_operator == 1: # All conditions must be met for condition in condition_list: if condition['equal'] == 1 and not set(condition['labelIdList']).issubset(set(contact_label_ids)): return False elif condition['equal'] == 2 and set(condition['labelIdList']).issubset(set(contact_label_ids)): return False return True elif condition_operator == 2: # Any condition can be met for condition in condition_list: if condition['equal'] == 1 and set(condition['labelIdList']).issubset(set(contact_label_ids)): return True elif condition['equal'] == 2 and not set(condition['labelIdList']).issubset(set(contact_label_ids)): return True return False class ReplyType(Enum): TEXT = 1 # 文本 VOICE = 2 # 音频文件 IMAGE = 3 # 图片文件 IMAGE_URL = 4 # 图片URL VIDEO_URL = 5 # 视频URL FILE = 6 # 文件 CARD = 7 # 微信名片,仅支持ntchat INVITE_ROOM = 8 # 邀请好友进群 INFO = 9 ERROR = 10 TEXT_ = 11 # 强制文本 VIDEO = 12 MINIAPP = 13 # 小程序 JSON_MULTIPLE_RESP = 14 # JSON多条回复数据 LOCATION = 48 # 位置消息 def __str__(self): return self.name class Reply: def __init__(self, stype: ReplyType = None, content=None): self.type = stype self.content = content def __str__(self): return "Reply(type={}, content={})".format(self.type, self.content)