update_llm_intent.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import json
  2. from time import sleep
  3. from typing import Dict
  4. import requests
  5. from openai import OpenAI
  6. import asyncio
  7. import aiohttp
  8. from app.admin.schema.intent_org import CurrentIntentOrgIns
  9. from app.call_center.crud.crud_intent_records import intent_records_dao
  10. from app.call_center.schema.intent_records import GetIntentRecordsDetails
  11. from common.log import log
  12. from core.conf import settings
  13. from database.db_mysql import async_db_session
  14. from database.db_redis import redis_client
  15. from utils.serializers import select_as_dict
  16. async def update_llm_intent():
  17. async with async_db_session.begin() as db:
  18. record = await intent_records_dao.get_earliest_record(db)
  19. if not record:
  20. return None
  21. record_data = GetIntentRecordsDetails(**select_as_dict(record))
  22. # 机构 id
  23. org_id = record_data.org_id
  24. # 从缓存中获取机构信息
  25. key = f'{settings.TOKEN_CALL_REDIS_PREFIX}:{org_id}'
  26. org_json = await redis_client.get(key)
  27. if not org_json:
  28. # 缓存中没有,从数据库中获取
  29. from app.admin.crud.crud_intent_org import intent_org_dao
  30. org = await intent_org_dao.get(db, org_id)
  31. if not org and org.status is not 1:
  32. log.error(f"意向评级时,机构不存在 org_id: {org_id}")
  33. return None
  34. org_data = CurrentIntentOrgIns(**select_as_dict(org))
  35. # 将数据放进缓存
  36. await redis_client.setex(
  37. key,
  38. settings.JWT_USER_REDIS_EXPIRE_SECONDS,
  39. org_data.model_dump_json(),
  40. )
  41. else:
  42. org_data = CurrentIntentOrgIns(**json.loads(org_json))
  43. # 开始评级
  44. intent_schema = {
  45. "name": "intent_schema",
  46. "schema": { # 添加 schema 字段
  47. "type": "object",
  48. "description": "为通话记录进行评级",
  49. "properties": {
  50. "score": {"type": "string", "description": "评分结果:有意向、待进一步分析、暂时无法沟通、其他"},
  51. "confidence_score": {"type": "number", "description": "置信度分数,范围从0.0到1.0"},
  52. "scoring_criteria": {"type": "string", "description": "请逐步介绍为何评为这个结果"}
  53. },
  54. "required": ["score", "confidence_score", "scoring_criteria"]
  55. }
  56. }
  57. rules = [
  58. {
  59. 'rule': "用户表现出对产品有积极回应,询问有关产品的问题,或愿意进行后续操作,如确认微信号、询问发货地址或表达接受产品(例:'行'、'好'、'可以'、'是的')。",
  60. 'score': '有意向'},
  61. {
  62. 'rule': "用户虽然未积极回应或表示不需要,但却有相关需求、或着符合某项购买条件、又或着询问了机器人问题,即便后期疲软或持否定态度,可移交销售顾问进行进一步的需求分析,以便改善产品或定制符合客户要求的产品",
  63. 'score': '待进一步分析'},
  64. {
  65. 'rule': "用户表示自己在忙、在开会等,现在不太方便沟通。又或者对电话的真实性存疑,怀疑是诈骗电话,可后续让专业客服继续跟进",
  66. 'score': '暂时无法沟通'},
  67. {
  68. 'rule': "除上述规则外的其他用户,请注意客户资源珍贵,请尽量归类为'有意向'或'待进一步分析',后续再由销售专家定性。",
  69. 'score': '其他'}
  70. ]
  71. score_map = {
  72. '有意向': 1,
  73. '待进一步分析': 2,
  74. '暂时无法沟通': 3,
  75. '其他': 4
  76. }
  77. messages = [
  78. {"role": "system", "content": f"""# 任务
  79. 1. 首先,判断用户的第一句话是否说了:“你好,(任意内容)通话”,如果说了,则不用理会评级规则,直接强制分配为"语音助手"
  80. 2. 如果不属于“语音助手”,请根据评级规则,对聊天记录给出评级、置信度、评分依据(逐项分析不要遗漏)
  81. # 细节说明
  82. 置信度从0到1,0为置信度最低,1为置信度最高。"""
  83. },
  84. {
  85. "role": "user",
  86. "content": f"""# 评级规则:
  87. {rules}
  88. # 聊天记录
  89. {record_data.chat_history}
  90. """
  91. }
  92. ]
  93. response_data = generate_json(org_data.openai_key, org_data.openai_base, messages, intent_schema)
  94. if response_data and isinstance(response_data.choices, list) and len(response_data.choices) > 0:
  95. first_choice = response_data.choices[0]
  96. if first_choice and first_choice.message:
  97. response_json = first_choice.message.content
  98. if response_json:
  99. intent = json.loads(response_json)
  100. score = intent.get('score', "未知")
  101. llm_intent = score_map.get(score, 0)
  102. # confidence_score = intent.get('confidence_score', 0)
  103. # scoring_criteria = intent.get('scoring_criteria', "未知")
  104. log.info(f"response_data.to_dict(): {response_data.to_dict()}")
  105. status = 2
  106. # 推送
  107. url = org_data.intent_callback
  108. if url:
  109. headers = {
  110. "Content-Type": "application/json"
  111. }
  112. data = {
  113. "internal_id": record_data.id,
  114. "external_id": record_data.external_id,
  115. "score": llm_intent,
  116. "intent": score
  117. }
  118. is_success = await send_request_with_retry(url, data, headers, max_retries=3, delay_between_retries=2)
  119. if is_success:
  120. status = 3
  121. # 存储结果
  122. status = 2
  123. async with async_db_session.begin() as db:
  124. try:
  125. await intent_records_dao.update_llm_intent(db, record_data.id, llm_intent,
  126. {"messages": messages},
  127. response_data.to_dict(),
  128. status)
  129. except Exception as e:
  130. log.error(f"更新意图记录时发生异常:{e}")
  131. def generate_json(api_key: str, openai_base: str, messages: list[dict], json_schema: dict):
  132. try:
  133. client_args = {}
  134. if api_key:
  135. client_args["api_key"] = api_key
  136. if openai_base:
  137. client_args["base_url"] = openai_base
  138. oai_client = OpenAI(**client_args)
  139. completion = oai_client.chat.completions.create(
  140. model="gpt-4o",
  141. messages=messages,
  142. response_format={
  143. "type": "json_schema",
  144. "json_schema": json_schema
  145. }
  146. )
  147. if completion and isinstance(completion.choices, list) and len(completion.choices) > 0:
  148. first_choice = completion.choices[0]
  149. if first_choice and first_choice.message:
  150. # return first_choice.message.content
  151. return completion
  152. except Exception as e:
  153. log.error(f"[oai] generate_json failed: {e}")
  154. async def send_request_with_retry(url: str, data: Dict, headers: Dict[str, str], max_retries: int, delay_between_retries: int) -> bool:
  155. for attempt in range(max_retries):
  156. try:
  157. async with session.ClientSession() as session:
  158. async with session.post(url, json=data, headers=headers, timeout=10) as response:
  159. if response.status == 200:
  160. return True
  161. except (aiohttp.ClientError, asyncio.TimeoutError) as e:
  162. log.error(f"请求异常:{e}")
  163. if attempt < max_retries - 1:
  164. print("重试中...")
  165. await asyncio.sleep(delay_between_retries)
  166. return False