123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- from typing import Optional
- import json
- import random
- import re
- import time
- from wcferry import WxMsg
- from common.log import logger
- from common.sql_lite import init_new_db_connection
- from enum import Enum
- from db.message_records import get_next_answer, create_message_record, check_message_record
- from db.contact import get_contact_by_wxid, add_contact_label
- from db.sop import get_stage
- from .plugin import Plugin
- from service.robot import get_robot
- class Sop(Plugin):
- def __init__(self):
- super().__init__()
- # 根据 sop 的设置返回回答内容
- def answer(self, msg: WxMsg, wx_wxid: Optional[str] = None):
- connection = init_new_db_connection()
- cursor = connection.cursor()
- robot = get_robot()
- bot_wxid = wx_wxid # 登陆微信人的wx_id
- contact_wxid = msg.sender # 发信息的人的wx_id
- receiver = wx_wxid # 接收消息的人
- contact_type = 2 if msg.from_group() else 1 # 1-人与机器人聊天 2-群内人与机器人聊天
- content = msg.content
- backup_node_id = -1 # 备份节点id
- need_judge = False # 是否需要判断询问大模型
- try:
- """
- 这里需要取出 执行状态的sop_task任务 然后根据他的sop_stage和sop_node内容判断是否
- 满足某个sop_node 要求,最后根据设置的 sop 来返回具体的内容
- """
- message_record, sop_nodes = get_next_answer(cursor, bot_wxid, contact_wxid, contact_type)
- if message_record and sop_nodes:
- prompt = f"""# 任务
- 请根据历史消息,判断用户回复的内容或深层意图,与哪个节点的意图相匹配。
- # 历史消息:
- 助手发送:{message_record["content"]}
- 用户回复:{content}
- # 节点列表:"""
- organization_id = message_record["organization_id"]
- for index, sop_node in enumerate(sop_nodes):
- # condition_list = json.loads(sop_node['condition_list'])
- """
- 代表任意内容,或者不回复,此时不把node节点内容塞给大模型
- """
- if sop_node['condition_list'] != '[""]':
- need_judge = True
- prompt += f"""
- 节点 id: {index}
- 节点意图:{sop_node['condition_list']}
- """
- else:
- if sop_node['no_reply_condition'] == 0:
- backup_node_id = index
- # prompt += f"""
- # 节点 id: {index}
- # 命中条件:用户发送任意内容
- # """
- prompt += f"""
- # 回复要求
- - 如果命中节点:则仅回复节点 id 数字(如命中多个节点,则仅回复最小值)
- - 如果未命中节点:则仅回复一个单词: None"""
- if need_judge:
- reply = self.reply(prompt, contact_wxid)
- else:
- reply = Reply()
- reply.content = "None"
- # logger.debug("[wxsop] reply: ?" % reply)
- if reply.content == "None" and backup_node_id != -1:
- reply.content = str(backup_node_id)
- if reply.content != "None":
- # 将 reply.content 从 str 转换为 int
- node_order = int(reply.content)
- # sop_nodes[node_order]['action_message']的值为 json str ,将其转换为字典
- messages = []
- forwards = []
- haveVar = False
- # 命中后发送的消息内容
- if sop_nodes[node_order]['action_message'] is not None:
- action_messages = json.loads(sop_nodes[node_order]['action_message'])
- for index, message in enumerate(action_messages):
- if message['content'] != "":
- type = int(message['type'])
- if type == 1:
- haveVar = self.contains_placeholder(message['content'])
- messages.append({
- "type": 1,
- "message": {
- "wxid": receiver,
- "msg": message['content']
- }
- })
- else:
- messages.append({
- "type": 2,
- "message": {
- "wxid": receiver,
- "filepath": message['content'],
- "diyfilename": message['meta']['filename']
- }
- })
- if len(messages) == 0:
- _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"],
- contact_type,
- contact_wxid, 1, "", {}, 4,
- sop_nodes[node_order]["id"], 0, organization_id)
- # 命中后转发到人工
- if sop_nodes[node_order]['action_forward'] is not None:
- action_forward = json.loads(sop_nodes[node_order]['action_forward'])
- if action_forward['wxid'] != "":
- forward_wxids = self.split_string(action_forward['wxid'])
- for forward_wxid in forward_wxids:
- for index, message in enumerate(action_forward['action']):
- if message['content'] != "":
- stype = int(message['type'])
- if stype == 1:
- haveVar = self.contains_placeholder(message['content'])
- forwards.append({
- "type": 1,
- "message": {
- "wxid": forward_wxid,
- "msg": message['content']
- }
- })
- else:
- forwards.append({
- "type": 2,
- "message": {
- "wxid": forward_wxid,
- "filepath": message['content'],
- "diyfilename": message['meta']['filename']
- }
- })
- if haveVar:
- contactinfo = get_contact_by_wxid(receiver)
- else:
- contactinfo = {}
- for index, message in enumerate(messages):
- # 随机睡眠
- time.sleep(random.uniform(2.0, 5.0))
- if message["type"] == 1:
- if haveVar:
- message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo)
- robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"],
- contact_type,
- contact_wxid, message["type"],
- message['message']['msg'], {}, 4,
- sop_nodes[node_order]["id"], index, organization_id)
- else:
- robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- _ = create_message_record(cursor, 3, bot_wxid, message_record["contact_id"],
- contact_type,
- contact_wxid, message["type"],
- message['message']['filepath'],
- {"filename": message['message']['diyfilename']}, 4,
- sop_nodes[node_order]["id"], index, organization_id)
- for message in forwards:
- # 随机睡眠
- time.sleep(random.uniform(2.0, 5.0))
- if message["type"] == 1:
- if haveVar:
- message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo)
- robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- else:
- robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- action_label_add = json.loads(sop_nodes[node_order]['action_label_add'])
- action_label_del = json.loads(sop_nodes[node_order]['action_label_del'])
- action_label_add_list = json.loads(sop_nodes[node_order]['action_label_add_list'])
- action_label_del_list = json.loads(sop_nodes[node_order]['action_label_del_list'])
- if action_label_add or action_label_del:
- stages = get_stage(organization_id)
- self.add_tag(bot_wxid, message_record["contact_id"], contact_type, contact_wxid,
- action_label_add, action_label_del, action_label_add_list, action_label_del_list,
- stages, organization_id)
- connection.commit()
- return msg.content
- except Exception as e:
- # 回滚事务
- connection.rollback()
- print(f"发生错误: {e}")
- finally:
- # 确保资源被正确释放
- cursor.close()
- connection.close()
- # reply 询问大模型
- def reply(self, query, prompt: str = None, sender: str = None):
- messages = [{"role": "user", "content": prompt}]
- reply_content = self._client_reply(self.openAiClient, messages, sender)
- if reply_content is not None:
- reply = Reply(ReplyType.TEXT, reply_content)
- return reply
- else:
- return Reply(ReplyType.ERROR, "None")
- def contains_placeholder(s: str) -> bool:
- pattern = r'\$\{.*?\}'
- return bool(re.search(pattern, s))
- def var_replace(s: str, contactinfo: dict) -> str:
- s = s.replace("${nickname}", contactinfo["nickname"])
- return s
- def split_string(input_string):
- # 定义正则表达式,匹配中文逗号、英文逗号和顿号
- pattern = r'[,,、]'
- # 使用re.split方法分割字符串
- result = re.split(pattern, input_string)
- return result
- # 为联系人添加标签
- def add_tag(self, bot_wxid, contact_id, contact_type, contact_wxid, action_label_add,
- action_label_del, action_label_add_list, action_label_del_list, stages, organization_id):
- connection = init_new_db_connection()
- cursor = connection.cursor()
- robot = get_robot()
- contact_label_ids = add_contact_label(contact_id, action_label_add, action_label_del, action_label_add_list, action_label_del_list)
- match_stages = []
- # logger.debug("[wxsop] contact_label_ids: ?" % contact_label_ids)
- # logger.debug("[wxsop] stages: ?" % stages)
- for stage in stages:
- if stage["condition_type"] == 1:
- # logger.debug("[wxsop] stage: ?" % stage)
- if self.check_filter(stage, contact_label_ids):
- match_stages.append(stage)
- for stage in match_stages:
- is_message_send = check_message_record(cursor, contact_wxid, 3, stage["id"], 0)
- if is_message_send:
- continue
- messages = []
- forwards = []
- haveVar = False
- if stage["action_message"]:
- action_message = json.loads(stage['action_message'])
- for index, message in enumerate(action_message):
- if message['content'] != "":
- type = int(message['type'])
- if type == 1:
- haveVar = self.contains_placeholder(message['content'])
- messages.append({
- "type": 1,
- "message": {
- "wxid": contact_wxid,
- "msg": message['content']
- }
- })
- else:
- messages.append({
- "type": 2,
- "message": {
- "wxid": contact_wxid,
- "filepath": message['content'],
- "diyfilename": message['meta']['filename']
- }
- })
- if len(messages) == 0:
- _ = create_message_record(cursor,3, bot_wxid, contact_id,
- contact_type,
- contact_wxid, 1, "", {}, 3,
- stage["id"], 0, organization_id)
- if stage["action_forward"]:
- action_forward = json.loads(stage['action_forward'])
- if action_forward['wxid'] != "":
- forward_wxids = self.split_string(action_forward['wxid'])
- for forward_wxid in forward_wxids:
- for index, message in enumerate(action_forward['action']):
- if message['content'] != "":
- stype = int(message['type'])
- if stype == 1:
- haveVar = self.contains_placeholder(message['content'])
- forwards.append({
- "type": 1,
- "message": {
- "wxid": forward_wxid,
- "msg": message['content']
- }
- })
- # _ = wx_hook_request("/SendTextMsg", data, server['private_ip'], wxinfo['port'])
- else:
- forwards.append({
- "type": 2,
- "message": {
- "wxid": forward_wxid,
- "filepath": message['content'],
- "diyfilename": message['meta']['filename']
- }
- })
- if haveVar:
- contactinfo = get_contact_by_wxid(contact_wxid)
- else:
- contactinfo = {}
- for index, message in enumerate(messages):
- # 随机睡眠
- time.sleep(random.uniform(2.0, 5.0))
- if message["type"] == 1:
- if haveVar:
- message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo)
- robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- _ = create_message_record(cursor, 3, bot_wxid, contact_id,
- contact_type,
- contact_wxid, message["type"],
- message['message']['msg'], {}, 3,
- stage["id"], index,
- organization_id)
- else:
- robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- #
- _ = create_message_record(cursor, 3, bot_wxid, contact_id,
- contact_type,
- contact_wxid, message["type"],
- message['message']['filepath'],
- {"filename": message['message'][
- 'diyfilename']}, 3,
- stage["id"], index,
- organization_id)
- for message in forwards:
- # 随机睡眠
- time.sleep(random.uniform(2.0, 5.0))
- if message["type"] == 1:
- if haveVar:
- message["message"]["msg"] = self.var_replace(message["message"]["msg"], contactinfo)
- robot.sendTextMsg(message["message"]["msg"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendTextMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- else:
- robot.sendFileMsg(message["message"]["filepath"], message["message"]["wxid"])
- # _ = wx_hook_request("/SendFileMsg", message["message"], server['private_ip'],
- # wxinfo['port'])
- if stage["action_label_add"] or stage["action_label_del"]:
- add_labels = json.loads(stage['action_label_add_list'])
- rem_labels = json.loads(stage['action_label_del_list'])
- self.add_tag(bot_wxid, contact_id, contact_type, contact_wxid, add_labels,
- rem_labels, stages, organization_id)
- def check_filter(filter, contact_label_ids):
- condition_operator = filter['condition_operator']
- condition_list = json.loads(filter['condition_list'])
- if condition_operator == 1:
- # All conditions must be met
- for condition in condition_list:
- if condition['equal'] == 1 and not set(condition['labelIdList']).issubset(set(contact_label_ids)):
- return False
- elif condition['equal'] == 2 and set(condition['labelIdList']).issubset(set(contact_label_ids)):
- return False
- return True
- elif condition_operator == 2:
- # Any condition can be met
- for condition in condition_list:
- if condition['equal'] == 1 and set(condition['labelIdList']).issubset(set(contact_label_ids)):
- return True
- elif condition['equal'] == 2 and not set(condition['labelIdList']).issubset(set(contact_label_ids)):
- return True
- return False
- class ReplyType(Enum):
- TEXT = 1 # 文本
- VOICE = 2 # 音频文件
- IMAGE = 3 # 图片文件
- IMAGE_URL = 4 # 图片URL
- VIDEO_URL = 5 # 视频URL
- FILE = 6 # 文件
- CARD = 7 # 微信名片,仅支持ntchat
- INVITE_ROOM = 8 # 邀请好友进群
- INFO = 9
- ERROR = 10
- TEXT_ = 11 # 强制文本
- VIDEO = 12
- MINIAPP = 13 # 小程序
- JSON_MULTIPLE_RESP = 14 # JSON多条回复数据
- LOCATION = 48 # 位置消息
- def __str__(self):
- return self.name
- class Reply:
- def __init__(self, stype: ReplyType = None, content=None):
- self.type = stype
- self.content = content
- def __str__(self):
- return "Reply(type={}, content={})".format(self.type, self.content)
|