123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- import threading
- import time
- import json
- import re
- import requests
- from common.sql_lite import init_new_db_connection
- from service.robot import get_robot
- from config import conf
- from db.sop import sop_node_get_list_by_condition
- from db.message_records import get_message_by_condition, get_message_by_contact_new_one, create_message_record
- sop_task_event = threading.Event()
- class SopTaskService:
- def __init__(self):
- self._robot = get_robot()
- self._wx_wxid = self._robot.wcf.get_self_wxid()
- self._execute_task_info = None
- def execute_task(self):
- connection = init_new_db_connection()
- try:
- while sop_task_event.is_set():
- # 查询最早的一条待执行或正在执行的任务
- cursor = connection.cursor()
- try:
- sop_nodes = sop_node_get_list_by_condition(cursor)
- for node in sop_nodes:
- start_time = time.time()
- if node['no_reply_unit'] == 'W':
- start_time = time.time() - 60 * 24 * 7 * 60 - 60 * 2
- elif node['no_reply_unit'] == 'D':
- start_time = time.time() - 60 * 24 * 60 - 60 * 2
- elif node['no_reply_unit'] == 'h':
- start_time = time.time() - 60 * 60 - 60 * 2
- end_time = start_time + 60 * 2
- if node['parent_id'] == 0:
- message_records = get_message_by_condition(cursor, 3, 3, node['stage_id'], 0, start_time, end_time)
- else:
- message_records = get_message_by_condition(cursor, 3, 4, node['parent_id'], 0, start_time, end_time)
- for s in message_records:
- latest = get_message_by_contact_new_one(cursor, s['contact_id'], 3)
- if latest['id'] == s['id']:
- # 处理 action_message
- if node['action_message'] is not None:
- action_messages = json.loads(s['action_message'])
- for idx, ac in enumerate(action_messages):
- if ac['meta'] is not None:
- meta = json.dumps({
- "wxid": s['bot_wxid'],
- "filepath": ac['content'],
- "diyfilename": ac['filename']
- })
- _ = create_message_record(cursor,
- 1,
- s['bot_wxid'],
- s['contact_id'],
- s['contact_type'],
- s['contact_wxid'],
- ac['content_type'],
- ac['content'],
- meta,
- 4,
- node['id'],
- idx,
- s['organization_id'])
- else:
- _ = create_message_record(cursor,
- 1,
- s['bot_wxid'],
- s['contact_id'],
- s['contact_type'],
- s['contact_wxid'],
- 1,
- '',
- {},
- 4,
- node['id'],
- 0,
- s['organization_id'])
- # 处理 action_forward
- if node['action_forward'] is not None:
- action_forward = json.loads(node['action_forward'])
- if action_forward['wxid'] != "":
- forward_wxids = self.split_string(action_forward['wxid'])
- for forward_wxid in forward_wxids:
- for idx, message in enumerate(action_forward['action']):
- meta = json.dumps({
- "wxid": s['bot_wxid'],
- "filepath": message['content'],
- "diyfilename": message['filename']
- })
- _ = create_message_record(cursor,
- 1,
- s['bot_wxid'],
- 0,
- 0,
- forward_wxid,
- message['type'],
- message['content'],
- meta,
- 4,
- node['id'],
- s['contact_id'] + idx,
- s['organization_id'])
- except Exception as e:
- print(f"发生错误: {e}")
- finally:
- pass
- connection.commit()
- except Exception as e:
- # 回滚事务
- connection.rollback()
- print(f"发生错误: {e}")
- finally:
- # 确保资源被正确释放
- cursor.close()
- # 解析输入内容
- def split_string(input_string):
- # 定义正则表达式,匹配中文逗号、英文逗号和顿号
- pattern = r'[,,、]'
- # 使用re.split方法分割字符串
- result = re.split(pattern, input_string)
- return result
- def start_sop_task():
- global sop_task_event
- print("Start SOP task.")
- # 设置事件为 True
- sop_task_event.set()
- threading.Thread(target=SopTaskService().execute_task).start()
- def stop_sop_task():
- global sop_task_event
- # 清除事件
- sop_task_event.clear()
- print("Stop SOP task.")
- # 从PC接口获取 sop_task 列表
- def get_sop_task_list():
- url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_task/list"
- token = str(conf().get("token"))
- payload = json.dumps({
- 'page': 1,
- 'pageSize': 1000,
- })
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': token,
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- if response.status_code == 200:
- resp = json.loads(response.text)
- if resp is not None and 'code' in resp and resp['code'] == 0:
- if 'total' in resp['data'] and resp['data']['total'] > 0:
- return resp['data']['data']
- return None
- # 从PC接口获取 sop_stage 列表
- def get_sop_stage_list(sop_task_ids):
- url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_stage/list"
- token = str(conf().get("token"))
- payload = json.dumps({
- 'taskIds': sop_task_ids,
- })
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': token,
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- print(f"get_sop_stage_list = {response.text}")
- if response.status_code == 200:
- resp = json.loads(response.text)
- if resp is not None and 'code' in resp and resp['code'] == 0:
- if 'total' in resp['data'] and resp['data']['total'] > 0:
- return resp['data']['data']
- return None
- def get_sop_node_list(sop_stage_ids):
- url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_node/list"
- token = str(conf().get("token"))
- payload = json.dumps({
- 'stageIds': sop_stage_ids,
- })
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': token,
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- # print(f"get_sop_node_list = {response.text}")
- if response.status_code == 200:
- resp = json.loads(response.text)
- # print(f"len={len(resp['data'])}, resp={resp}")
- if resp is not None and 'code' in resp and resp['code'] == 0:
- if 'data' in resp and len(resp['data']) > 0:
- return resp['data']
- return None
|