sop_service.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import threading
  2. import time
  3. import json
  4. import re
  5. import requests
  6. from common.sql_lite import init_new_db_connection
  7. from service.robot import get_robot
  8. from config import conf
  9. from db.sop import sop_node_get_list_by_condition
  10. from db.message_records import get_message_by_condition, get_message_by_contact_new_one, create_message_record
  11. sop_task_event = threading.Event()
  12. class SopTaskService:
  13. def __init__(self):
  14. self._robot = get_robot()
  15. self._wx_wxid = self._robot.wcf.get_self_wxid()
  16. self._execute_task_info = None
  17. def execute_task(self):
  18. connection = init_new_db_connection()
  19. try:
  20. while sop_task_event.is_set():
  21. # 查询最早的一条待执行或正在执行的任务
  22. cursor = connection.cursor()
  23. try:
  24. sop_nodes = sop_node_get_list_by_condition(cursor)
  25. for node in sop_nodes:
  26. start_time = time.time()
  27. if node['no_reply_unit'] == 'W':
  28. start_time = time.time() - 60 * 24 * 7 * 60 - 60 * 2
  29. elif node['no_reply_unit'] == 'D':
  30. start_time = time.time() - 60 * 24 * 60 - 60 * 2
  31. elif node['no_reply_unit'] == 'h':
  32. start_time = time.time() - 60 * 60 - 60 * 2
  33. end_time = start_time + 60 * 2
  34. if node['parent_id'] == 0:
  35. message_records = get_message_by_condition(cursor, 3, 3, node['stage_id'], 0, start_time, end_time)
  36. else:
  37. message_records = get_message_by_condition(cursor, 3, 4, node['parent_id'], 0, start_time, end_time)
  38. for s in message_records:
  39. latest = get_message_by_contact_new_one(cursor, s['contact_id'], 3)
  40. if latest['id'] == s['id']:
  41. # 处理 action_message
  42. if node['action_message'] is not None:
  43. action_messages = json.loads(s['action_message'])
  44. for idx, ac in enumerate(action_messages):
  45. if ac['meta'] is not None:
  46. meta = json.dumps({
  47. "wxid": s['bot_wxid'],
  48. "filepath": ac['content'],
  49. "diyfilename": ac['filename']
  50. })
  51. _ = create_message_record(cursor,
  52. 1,
  53. s['bot_wxid'],
  54. s['contact_id'],
  55. s['contact_type'],
  56. s['contact_wxid'],
  57. ac['content_type'],
  58. ac['content'],
  59. meta,
  60. 4,
  61. node['id'],
  62. idx,
  63. s['organization_id'])
  64. else:
  65. _ = create_message_record(cursor,
  66. 1,
  67. s['bot_wxid'],
  68. s['contact_id'],
  69. s['contact_type'],
  70. s['contact_wxid'],
  71. 1,
  72. '',
  73. {},
  74. 4,
  75. node['id'],
  76. 0,
  77. s['organization_id'])
  78. # 处理 action_forward
  79. if node['action_forward'] is not None:
  80. action_forward = json.loads(node['action_forward'])
  81. if action_forward['wxid'] != "":
  82. forward_wxids = self.split_string(action_forward['wxid'])
  83. for forward_wxid in forward_wxids:
  84. for idx, message in enumerate(action_forward['action']):
  85. meta = json.dumps({
  86. "wxid": s['bot_wxid'],
  87. "filepath": message['content'],
  88. "diyfilename": message['filename']
  89. })
  90. _ = create_message_record(cursor,
  91. 1,
  92. s['bot_wxid'],
  93. 0,
  94. 0,
  95. forward_wxid,
  96. message['type'],
  97. message['content'],
  98. meta,
  99. 4,
  100. node['id'],
  101. s['contact_id'] + idx,
  102. s['organization_id'])
  103. except Exception as e:
  104. print(f"发生错误: {e}")
  105. finally:
  106. pass
  107. connection.commit()
  108. except Exception as e:
  109. # 回滚事务
  110. connection.rollback()
  111. print(f"发生错误: {e}")
  112. finally:
  113. # 确保资源被正确释放
  114. cursor.close()
  115. # 解析输入内容
  116. def split_string(input_string):
  117. # 定义正则表达式,匹配中文逗号、英文逗号和顿号
  118. pattern = r'[,,、]'
  119. # 使用re.split方法分割字符串
  120. result = re.split(pattern, input_string)
  121. return result
  122. def start_sop_task():
  123. global sop_task_event
  124. print("Start SOP task.")
  125. # 设置事件为 True
  126. sop_task_event.set()
  127. threading.Thread(target=SopTaskService().execute_task).start()
  128. def stop_sop_task():
  129. global sop_task_event
  130. # 清除事件
  131. sop_task_event.clear()
  132. print("Stop SOP task.")
  133. # 从PC接口获取 sop_task 列表
  134. def get_sop_task_list():
  135. url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_task/list"
  136. token = str(conf().get("token"))
  137. payload = json.dumps({
  138. 'page': 1,
  139. 'pageSize': 1000,
  140. })
  141. headers = {
  142. 'Content-Type': 'application/json',
  143. 'Authorization': token,
  144. }
  145. response = requests.request("POST", url, headers=headers, data=payload)
  146. if response.status_code == 200:
  147. resp = json.loads(response.text)
  148. if resp is not None and 'code' in resp and resp['code'] == 0:
  149. if 'total' in resp['data'] and resp['data']['total'] > 0:
  150. return resp['data']['data']
  151. return None
  152. # 从PC接口获取 sop_stage 列表
  153. def get_sop_stage_list(sop_task_ids):
  154. url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_stage/list"
  155. token = str(conf().get("token"))
  156. payload = json.dumps({
  157. 'taskIds': sop_task_ids,
  158. })
  159. headers = {
  160. 'Content-Type': 'application/json',
  161. 'Authorization': token,
  162. }
  163. response = requests.request("POST", url, headers=headers, data=payload)
  164. print(f"get_sop_stage_list = {response.text}")
  165. if response.status_code == 200:
  166. resp = json.loads(response.text)
  167. if resp is not None and 'code' in resp and resp['code'] == 0:
  168. if 'total' in resp['data'] and resp['data']['total'] > 0:
  169. return resp['data']['data']
  170. return None
  171. def get_sop_node_list(sop_stage_ids):
  172. url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_node/list"
  173. token = str(conf().get("token"))
  174. payload = json.dumps({
  175. 'stageIds': sop_stage_ids,
  176. })
  177. headers = {
  178. 'Content-Type': 'application/json',
  179. 'Authorization': token,
  180. }
  181. response = requests.request("POST", url, headers=headers, data=payload)
  182. # print(f"get_sop_node_list = {response.text}")
  183. if response.status_code == 200:
  184. resp = json.loads(response.text)
  185. # print(f"len={len(resp['data'])}, resp={resp}")
  186. if resp is not None and 'code' in resp and resp['code'] == 0:
  187. if 'data' in resp and len(resp['data']) > 0:
  188. return resp['data']
  189. return None