from typing import Optional import requests from openai import OpenAI from wcferry import WxMsg from common.log import logger from common.sql_lite import init_new_db_connection from config import Config from openai import OpenAI, AuthenticationError, APIConnectionError, APIError from db.msg_history import msg_history_get, msg_history_update from .plugin import Plugin class Agent(Plugin): def __init__(self): super().__init__() def answer(self, msg: WxMsg, wx_wxid: Optional[str] = None): connection = init_new_db_connection() dataset_id = self.config.get("dataset_id") answer = None cursor = connection.cursor() try: history_messages = msg_history_get(cursor, wx_wxid, msg.sender, msg.roomid) if not dataset_id: # 优化问题 expand_system_prompt = f"""# 任务: 请根据上下文信息,优化用户发送的最后一条消息,补齐消息中可能缺失的主语、谓语、宾语、定语、状语、补语句子成分。 # 用户发送的最后一条消息:{msg.content} # 回复要求 1. 直接输出优化后的消息""" expand_messages = [{"role": "system", "content": expand_system_prompt}] + history_messages expand_bot_reply = self._client_reply(self.openAiClient, expand_messages) answer = self._dataset_search(self.config.get("dataset_id"), expand_bot_reply) system_prompt = f"""# 角色 {self.config.get("role")} # 背景: {self.config.get("background")} """ if answer is not None: system_prompt += """# 相关知识: """ for data in answer: system_prompt += f"""问题:{data['q']} 答案:{data['a']} """ system_prompt += """# 回复要求: 1. 直接以角色设定的角度回答问题,并以第一人称输出。 2. 不要在回复前加角色、姓名。 3. 回复要正式""" messages = [{"role": "system", "content": system_prompt}] + history_messages + [{"role": "user", "content": msg.content}] bot_reply = self._client_reply(self.openAiClient, messages) new_history_messages = history_messages + [{"role": "user", "content": msg.content}, {"role": "assistant", "content": bot_reply}] msg_history_update(cursor, wx_wxid, new_history_messages, msg.sender, msg.roomid) connection.commit() return bot_reply except Exception as e: # 回滚事务 connection.rollback() print(f"发生错误: {e}") finally: # 确保资源被正确释放 cursor.close() connection.close() def _dataset_search(self, dataset_id: str, text: str): headers = {"Content-Type": "application/json", 'Authorization': f'Bearer {self.config.get("dataset_key")}'} args = { "datasetId": dataset_id, "text": text, "limit": 100, "similarity": 0.8, "searchMode": "mixedRecall", "usingReRank": False } response = requests.post(self.config.get("dataset_base"), headers=headers, json=args) if response.status_code == 200: response_json = response.json() if response_json and "data" in response_json and response_json["data"] and "list" in response_json["data"]: return response_json["data"]["list"] else: return None return None