123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import json
- from time import sleep
- from typing import Dict
- import re
- from openai import OpenAI
- import asyncio
- import aiohttp
- from pydantic import BaseModel
- from app.admin.schema.org import CurrentIntentOrgIns
- from app.call_center.crud.crud_intent_records import intent_records_dao
- from app.call_center.crud.crud_mismatch_records import mismatch_records_dao
- from app.call_center.schema.intent_records import GetIntentRecordsDetails
- from app.call_center.schema.mismatch_records import GetMismatchRecordsDetails
- from common.log import log
- from common.oai import send_request_with_retry, generate_json, generate_text, generate_json_by_class
- from core.conf import settings
- from database.db_mysql import async_db_session
- from database.db_redis import redis_client
- from utils.serializers import select_as_dict
- class Keyword(BaseModel):
- user_intent: str
- similar_reply: list[str]
- keywords: list[str]
- regular: list[str]
- async def update_mismatch_record():
- async with async_db_session.begin() as db:
- record = await mismatch_records_dao.get_earliest_record(db)
- if not record:
- return None
- record_data = GetMismatchRecordsDetails(**select_as_dict(record))
- log.info(f"[oai] record_data: {record_data}")
- log.info(f"[oai] record_data.missed: {record_data.missed}")
- # 机构 id
- org_id = record_data.org_id
- # 从缓存中获取机构信息
- key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{org_id}'
- org_json = await redis_client.get(key)
- if not org_json:
- # 缓存中没有,从数据库中获取
- from app.admin.crud.crud_org import org_dao
- org = await org_dao.get(db, org_id)
- if not org and org.status is not 1:
- log.error(f"意向评级时,机构不存在 org_id: {org_id}")
- return None
- log.info(f"org: {org}")
- org_data = CurrentIntentOrgIns(**select_as_dict(org))
- # 将数据放进缓存
- await redis_client.setex(
- key,
- settings.JWT_USER_REDIS_EXPIRE_SECONDS,
- org_data.model_dump_json(),
- )
- else:
- org_data = CurrentIntentOrgIns(**json.loads(org_json))
- # 开始过滤
- llm_ignore = record_data.llm_ignore
- ignore_response_data = None
- if record_data.ignore == 0:
- ignore_messages = [
- {"role": "system", "content": f"""# 任务介绍
- 公司在用语音识别系统巡检电话通话记录时,可能会因为通话时的背景噪音、方言等问题,导致用户的语音内容没有被正确识别,请根据以下通话记录,判断识别到的内容是否无意义或使上下文不通顺,非常感谢!
- # 注意事项
- 不用考虑机器人的回复,因为即便识别正确,机器人的回复也可能有误
- # 输出要求
- 1. 如放在上下文中无意义,请回复一个数字0
- 2. 如放在上下文中有意义,请回复一个数字1
- 3. 请不要回复 0 或 1 以外的文字内容"""
- },
- {
- "role": "user",
- "content": f"""# 通话记录
- {record_data.chat_history}
- # 可能识别有误的内容:{record_data.missed}
- """
- }
- ]
- filter_model = "gpt-3.5-turbo"
- if org_data.model == "DeepSeek-V3":
- filter_model = org_data.model
- ignore_response_data = generate_text(org_data.openai_key, org_data.openai_base, filter_model, ignore_messages)
- if ignore_response_data and isinstance(ignore_response_data.choices, list) and len(ignore_response_data.choices) > 0:
- first_choice = ignore_response_data.choices[0]
- if first_choice and first_choice.message:
- ignore_content = first_choice.message.content
- if ignore_content == "0":
- status = 1
- llm_ignore = 1
- # 推送
- url = org_data.mismatch_callback
- log.error(f"url: {url}")
- if url:
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "internal_id": record_data.id,
- "external_id": record_data.external_id,
- "ignore": record_data.ignore,
- "llm_ignore": llm_ignore
- }
- is_success = await send_request_with_retry(url, data, headers, max_retries=3,
- delay_between_retries=2)
- if is_success:
- status = 2
- async with async_db_session.begin() as db:
- try:
- await mismatch_records_dao.update_llm_ignore(db, record_data.id, llm_ignore,
- ignore_response_data.to_dict(),
- status)
- except Exception as e:
- log.error(f"更新意图记录时发生异常:{e}")
- finally:
- return None
- elif ignore_content == "1":
- llm_ignore = 2
- ignore_response_data = ignore_response_data.to_dict()
- # 开始分析
- if org_data.model == "gpt-4o":
- keyword_messages = [
- {"role": "system", "content": f"""# 任务介绍
- 公司在用程序巡检电话通话记录时,有些用户的回复内容没有被关键词库识别到,请根据以下通话记录,给出一些类似的回复,以及对关键词库的具体建议,非常感谢!
-
- # 输出要求
- 1. 用户意图:结合上下文,首先考虑可能的语音识别错误并纠错,然后分析未识别内容的根本意图,用户说这句话的背后真实目的是什么,结合上下文挖掘用户最深的意图,请不要停留在表面的意思。
- 2. 类似回复:首先应包含未识别的内容原文:{record_data.missed}。其次生成尽可能多的类似意图的回复。
- 2. 关键词库:从类似回复中抽取特征词,要求有两点:一是特征词应尽可能的准确,即要覆盖住此类问题,又要尽可能避免和其他语境相冲突。二是需要注重通用性,一些专有名词、人名、地名、产品名、公司名等需要排除在外。
- 3. 正则表达式:对关键词词进行解耦,用正则表达式进行表示,专注于主要内容,排除次要内容
-
- # 注意事项
- 1. 应贴近真实的外呼场景,用户的素质参差不齐,请避免使用任何浮夸的用词,避免使用高级词汇,避免使用任何的礼貌用语或敬语,适当的低素质些,请至少给出20条结果
- 2. 通话记录为录音转译,因此可能有错别字或音同字不同的情况(例如:借和接),请根据上下文分析后就成可能的错字错误
- 3. 正则关键词库和正则表达式中也应该考虑到音同字不同的情况,且避免使用匹配次数相关的语法如{0,2}"""
- },
- {
- "role": "user",
- "content": f"""# 通话记录
- {record_data.chat_history}
-
- # 可能识别有误的内容:{record_data.missed}
- """
- }
- ]
- response_data = generate_json_by_class(org_data.openai_key, org_data.openai_base, org_data.model, keyword_messages, Keyword)
- else:
- keyword_messages = [
- {"role": "system", "content": f"""# 任务介绍
- 公司在用程序巡检电话通话记录时,有些用户的回复内容没有被关键词库识别到,请根据以下通话记录,给出一些类似的回复,以及对关键词库的具体建议,非常感谢!
- # 输出要求
- 1. 用户意图:结合上下文,首先考虑可能的语音识别错误并纠错,然后分析未识别内容的根本意图,用户说这句话的背后真实目的是什么,结合上下文挖掘用户最深的意图,请不要停留在表面的意思。
- 2. 类似回复:首先应包含未识别的内容原文:{record_data.missed}。其次生成尽可能多的类似意图的回复。
- 2. 关键词库:从类似回复中抽取特征词,要求有两点:一是特征词应尽可能的准确,即要覆盖住此类问题,又要尽可能避免和其他语境相冲突。二是需要注重通用性,一些专有名词、人名、地名、产品名、公司名等需要排除在外。
- 3. 正则表达式:对关键词词进行解耦,用正则表达式进行表示,专注于主要内容,排除次要内容
- # 注意事项
- 1. 应贴近真实的外呼场景,用户的素质参差不齐,请避免使用任何浮夸的用词,避免使用高级词汇,避免使用任何的礼貌用语或敬语,适当的低素质些,请至少给出20条结果
- 2. 通话记录为录音转译,因此可能有错别字或音同字不同的情况(例如:借和接),请根据上下文分析后就成可能的错字错误
- 3. 正则关键词库和正则表达式中也应该考虑到音同字不同的情况,且避免使用匹配次数相关的语法如{{0, 2}}"""
- },
- {
- "role": "user",
- "content": f"""# 通话记录
- {record_data.chat_history}
- # 可能识别有误的内容:{record_data.missed}
-
- # 请以下方的json结构输出
- {{
- user_intent: str
- similar_reply: list[str]
- keywords: list[str]
- regular: list[str]
- }}
- """
- }
- ]
- response_data = generate_json_by_class(org_data.openai_key, org_data.openai_base, org_data.model, keyword_messages, { "type": "json_object" })
- log.info(f"response_data: {response_data}")
- log.info(f"response_data.message: {response_data.choices[0].message.content}")
- # log.info(f"response_data.message.content: {response_data.message.content}")
- if response_data and isinstance(response_data.choices, list) and len(response_data.choices) > 0:
- first_choice = response_data.choices[0]
- if first_choice and first_choice.message:
- response_json = first_choice.message.content
- if response_json:
- match = re.search(r'\{.*\}', response_json, re.DOTALL)
- if not match:
- return None
- json_str = match.group(0)
- intent = json.loads(json_str)
- user_intent = intent.get('user_intent')
- similar_reply = intent.get('similar_reply')
- keywords = intent.get('keywords', [])
- regular = intent.get('regular', [])
- if user_intent and similar_reply:
- status = 1
- # 推送
- url = org_data.mismatch_callback
- log.error(f"url: {url}")
- if url:
- headers = {
- "Content-Type": "application/json"
- }
- data = {
- "internal_id": record_data.id,
- "external_id": record_data.external_id,
- "ignore": record_data.ignore,
- "llm_ignore": llm_ignore,
- "user_intent": user_intent,
- "similar_reply": similar_reply,
- "keywords": keywords,
- "regular": regular
- }
- is_success = await send_request_with_retry(url, data, headers, max_retries=3, delay_between_retries=2)
- if is_success:
- status = 2
- async with async_db_session.begin() as db:
- try:
- await mismatch_records_dao.update(db, record_data.id, llm_ignore, user_intent, similar_reply, keywords, regular, {"messages": keyword_messages}, response_data.to_dict(), status, ignore_response_data)
- except Exception as e:
- log.error(f"更新意图记录时发生异常:{e}")
|