123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201 |
- import json
- import asyncio
- from pydantic import BaseModel
- from app.admin.schema.intent_org import CurrentIntentOrgIns
- from app.call_center.crud.crud_mismatch_records import mismatch_records_dao
- from app.call_center.schema.mismatch_records import GetMismatchRecordsDetails
- from common.log import log
- from common.oai import send_request_with_retry, generate_text, generate_json
- from database.db_mysql import async_db_session
- from model.mismatch_records import MismatchRecords
- from utils.serializers import select_as_dict
- # class Keyword(BaseModel):
- # user_intent: str
- # similar_reply: list[str]
- # keywords: list[str]
- # regular: list[str]
- async def update_mismatch_record(org_map: dict[int, CurrentIntentOrgIns], limit: int = 1):
- async with async_db_session.begin() as db:
- record = await mismatch_records_dao.get_earliest_record(db, limit)
- if not record:
- return 0
- await asyncio.gather(*(process_mismatch_record(org_map[r.org_id], r) for r in record))
- return len(record)
- async def process_mismatch_record(org_data: CurrentIntentOrgIns, record: MismatchRecords):
- record_data = GetMismatchRecordsDetails(**select_as_dict(record))
- log.info(f"[process_mismatch_record] record_data: {record_data}")
- # 开始过滤
- llm_ignore = record_data.llm_ignore # 如后续人工强制强制分析,则保留原始过滤结果
- ignore_response_data = None
- ignore_model = None
- ignore_prompt_tokens = None
- ignore_completion_tokens = None
- if record_data.ignore == 0:
- ignore_messages = [
- {"role": "system", "content": f"""# 任务介绍
- 公司在用语音识别系统巡检电话通话记录时,可能会因为通话时的背景噪音、方言等问题,导致用户的语音内容没有被正确识别,请根据以下通话记录,判断识别到的内容是否无意义或使上下文不通顺,非常感谢!
- # 注意事项
- 不用考虑机器人的回复,因为即便识别正确,机器人的回复也可能有误
- # 输出要求
- 1. 如放在上下文中无意义,请回复一个数字0
- 2. 如放在上下文中有意义,请回复一个数字1
- 3. 请不要回复 0 或 1 以外的文字内容"""
- },
- {
- "role": "user",
- "content": f"""# 通话记录
- {record_data.chat_history}
- # 可能识别有误的内容:{record_data.missed}
- """
- }
- ]
- ignore_model = "gpt-3.5-turbo"
- if org_data.model == "deepseek-v3" or org_data.model == "DeepSeek-V3":
- ignore_model = org_data.model
- ignore_content, ignore_response_data = await generate_text(org_data.openai_key, org_data.openai_base, ignore_model, ignore_messages)
- if ignore_content:
- ignore_prompt_tokens = ignore_response_data.usage.prompt_tokens
- ignore_completion_tokens = ignore_response_data.usage.completion_tokens
- if ignore_content == "0":
- status = 1
- llm_ignore = 1
- # 推送
- url = org_data.mismatch_callback
- if url:
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "internal_id": record_data.id,
- "external_id": record_data.external_id,
- "ignore": record_data.ignore,
- "llm_ignore": llm_ignore
- }
- is_success = await send_request_with_retry(url, data, headers, max_retries=3,
- delay_between_retries=2)
- if is_success:
- status = 2
- async with async_db_session.begin() as db:
- try:
- await mismatch_records_dao.update_llm_ignore(db, record_data.id, llm_ignore,
- ignore_response_data.to_dict(),
- status, ignore_model, ignore_prompt_tokens, ignore_completion_tokens)
- except Exception as e:
- log.error(f"更新意图记录时发生异常:{e}")
- finally:
- return None
- elif ignore_content == "1":
- llm_ignore = 2
- ignore_response_data = ignore_response_data.to_dict()
- else:
- return None
- # 开始分析
- if org_data.model == "deepseek-v3" or org_data.model == "DeepSeek-V3":
- keyword_schema = """{
- "user_intent": str, #用户意图
- "similar_reply": list[str], #类似回复
- "keywords": list[str], #关键词库
- "regular": list[str], #正则表达式
- }"""
- else:
- keyword_schema = {
- "name": "keyword_schema",
- "schema": { # 添加 schema 字段
- "type": "object",
- "description": "从通话记录中提取表单值",
- "properties": {
- "user_intent": {"type": "string", "description": "用户意图"},
- "similar_reply": {
- "type": "array",
- "description": "类似回复",
- "items": {"type": "string"}
- },
- "keywords": {
- "type": "array",
- "description": "关键词库",
- "items": {"type": "string"}
- },
- "regular": {
- "type": "array",
- "description": "正则表达式",
- "items": {"type": "string"}
- },
- },
- "required": ["user_intent", "similar_reply", "keywords", "regular"]
- }
- }
- keyword_messages = [
- {"role": "system", "content": f"""# 任务介绍
- 公司在用程序巡检电话通话记录时,有些用户的回复内容没有被关键词库识别到,请根据以下通话记录,给出一些类似的回复,以及对关键词库的具体建议,非常感谢!
- # 输出要求
- 1. 用户意图:结合上下文,首先考虑可能的语音识别错误并纠错,然后分析未识别内容的根本意图,用户说这句话的背后真实目的是什么,结合上下文挖掘用户最深的意图,请不要停留在表面的意思。
- 2. 类似回复:首先应包含未识别的内容原文:{record_data.missed}。其次生成尽可能多的类似意图的回复。
- 2. 关键词库:从类似回复中抽取特征词,要求有两点:一是特征词应尽可能的准确,即要覆盖住此类问题,又要尽可能避免和其他语境相冲突。二是需要注重通用性,一些专有名词、人名、地名、产品名、公司名等需要排除在外。
- 3. 正则表达式:对关键词词进行解耦,用正则表达式进行表示,专注于主要内容,排除次要内容
- # 注意事项
- 1. 应贴近真实的外呼场景,用户的素质参差不齐,请避免使用任何浮夸的用词,避免使用高级词汇,避免使用任何的礼貌用语或敬语,适当的低素质些,请至少给出20条结果
- 2. 通话记录为录音转译,因此可能有错别字或音同字不同的情况(例如:借和接),请根据上下文分析后就成可能的错字错误
- 3. 正则关键词库和正则表达式中也应该考虑到音同字不同的情况,且避免使用匹配次数相关的语法如{0,2}"""
- },
- {
- "role": "user",
- "content": f"""# 通话记录
- {record_data.chat_history}
- # 可能识别有误的内容:{record_data.missed}
- """
- }
- ]
- response_json, response_data = await generate_json(org_data.openai_key, org_data.openai_base, org_data.model, keyword_messages, keyword_schema)
- if response_json:
- user_intent = response_json.get('user_intent')
- similar_reply = response_json.get('similar_reply')
- keywords = response_json.get('keywords', [])
- regular = response_json.get('regular', [])
- if user_intent and similar_reply:
- status = 1
- # 推送
- url = org_data.mismatch_callback
- if url:
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "internal_id": record_data.id,
- "external_id": record_data.external_id,
- "ignore": record_data.ignore,
- "llm_ignore": llm_ignore,
- "user_intent": user_intent,
- "similar_reply": similar_reply,
- "keywords": keywords,
- "regular": regular
- }
- is_success = await send_request_with_retry(url, data, headers, max_retries=3, delay_between_retries=2)
- if is_success:
- status = 2
- async with async_db_session.begin() as db:
- try:
- await mismatch_records_dao.update(db, record_data.id, llm_ignore, user_intent, similar_reply, keywords, regular, {"messages": keyword_messages}, response_data.to_dict(), status, ignore_response_data, ignore_model, ignore_prompt_tokens, ignore_completion_tokens, org_data.model, response_data.usage.prompt_tokens, response_data.usage.completion_tokens)
- except Exception as e:
- log.error(f"更新意图记录时发生异常:{e}")
|