import asyncio import json from app.admin.schema.intent_org import CurrentIntentOrgIns from app.call_center.crud.crud_intent_records import intent_records_dao from app.call_center.schema.intent_records import GetIntentRecordsDetails from common.log import log from common.oai import generate_json, send_request_with_retry from database.db_mysql import async_db_session from model.intent_records import IntentRecords from utils.serializers import select_as_dict async def update_llm_intent(org_map: dict[int, CurrentIntentOrgIns], limit: int = 1): async with async_db_session.begin() as db: record = await intent_records_dao.get_earliest_record(db, limit) if not record: return 0 # # 从缓存中获取机构信息 # key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{1}' # org_json = await redis_client.get(key) # if not org_json: # # 缓存中没有,从数据库中获取 # from app.admin.crud.crud_intent_org import intent_org_dao # async with async_db_session.begin() as db: # org = await intent_org_dao.get(db, org_id) # if not org and org.status is not 1: # log.error(f"意向评级时,机构不存在 org_id: {org_id}") # return None # org_data = CurrentIntentOrgIns(**select_as_dict(org)) # # 将数据放进缓存 # await redis_client.setex( # key, # settings.JWT_USER_REDIS_EXPIRE_SECONDS, # org_data.model_dump_json(), # ) # else: # org_data = CurrentIntentOrgIns(**json.loads(org_json)) await asyncio.gather(*(process_llm_intent(org_map[r.org_id], r) for r in record)) return len(record) async def process_llm_intent(org_data: CurrentIntentOrgIns, record: IntentRecords): record_data = GetIntentRecordsDetails(**select_as_dict(record)) log.info(f"[process_llm_intent] record_data: {record_data}") # 机构 id # org_id = record_data.org_id # 开始评级 if org_data.model == "deepseek-v3" or org_data.model == "DeepSeek-V3": intent_schema = """{ "score": str, #评分结果:有意向、待进一步分析、暂时无法沟通、其他 "confidence_score": int, #置信度分数,范围从0.0到1.0 "scoring_criteria": str, #请逐步介绍为何评为这个结果 }""" else: intent_schema = { "name": "intent_schema", "schema": { # 添加 schema 字段 "type": "object", "description": "为通话记录进行评级", "properties": { "score": {"type": "string", "description": "评分结果:有意向、待进一步分析、暂时无法沟通、其他"}, "confidence_score": {"type": "number", "description": "置信度分数,范围从0.0到1.0"}, "scoring_criteria": {"type": "string", "description": "请逐步介绍为何评为这个结果"} }, "required": ["score", "confidence_score", "scoring_criteria"] } } rules = [ { 'rule': "用户表现出对产品有积极回应,询问有关产品的问题,或愿意进行后续操作,如确认微信号、询问发货地址或表达接受产品(例:'行'、'好'、'可以'、'是的')。", 'score': '有意向'}, { 'rule': "用户虽然未积极回应或表示不需要,但却有相关需求、或着符合某项购买条件、又或着询问了机器人问题,即便后期疲软或持否定态度,可移交销售顾问进行进一步的需求分析,以便改善产品或定制符合客户要求的产品", 'score': '待进一步分析'}, { 'rule': "用户表示自己在忙、在开会等,现在不太方便沟通。又或者对电话的真实性存疑,怀疑是诈骗电话,可后续让专业客服继续跟进", 'score': '暂时无法沟通'}, { 'rule': "除上述规则外的其他用户,包括:用户未回复直接挂机、用户拒绝后直接挂机的情况。请注意客户资源珍贵,请尽量归类为'有意向'或'待进一步分析',后续再由销售专家定性。", 'score': '其他'} ] score_map = { '有意向': 1, '待进一步分析': 2, '暂时无法沟通': 3, '其他': 4 } intent_map = { '有意向': "有意向", '待进一步分析': "无法判定", '暂时无法沟通': "在忙", '其他': "无意向" } messages = [ {"role": "system", "content": f"""# 任务 1. 首先,判断用户的第一句话是否说了:“你好,(任意内容)通话”,如果说了,则不用理会评级规则,直接强制分配为"语音助手" 2. 如果不属于“语音助手”,请根据评级规则,对聊天记录给出评级、置信度、评分依据(逐项分析不要遗漏) # 细节说明 置信度从0到1,0为置信度最低,1为置信度最高。""" }, { "role": "user", "content": f"""# 评级规则: {rules} # 聊天记录 {record_data.chat_history} """ } ] response_json, response_data = await generate_json(org_data.openai_key, org_data.openai_base, org_data.model, messages, intent_schema) if response_json: score = response_json.get('score', "未知") llm_intent = score_map.get(score, 4) if llm_intent != 0: # confidence_score = response_json.get('confidence_score', 0) # scoring_criteria = response_json.get('scoring_criteria', "未知") status = 1 # 推送 url = org_data.intent_callback log.error(f"intent_callback: {url}") if url: headers = { "Content-Type": "application/json" } data = { "internal_id": record_data.id, "external_id": record_data.external_id, "score": llm_intent, "intent": intent_map.get(score, "无法判定") } is_success = await send_request_with_retry(url, data, headers, max_retries=3, delay_between_retries=2) if is_success: status = 2 async with async_db_session() as session: async with session.begin(): try: await intent_records_dao.update_llm_intent(session, record_data.id, llm_intent, {"messages": messages}, response_data.to_dict(), status, org_data.model, response_data.usage.prompt_tokens, response_data.usage.completion_tokens) except Exception as e: log.error(f"更新意图记录时发生异常:{e}")