import threading import time import json import re import requests from common.sql_lite import init_new_db_connection from service.robot import get_robot from config import conf from db.sop import sop_node_get_list_by_condition from db.message_records import get_message_by_condition, get_message_by_contact_new_one, create_message_record sop_task_event = threading.Event() class SopTaskService: def __init__(self): self._robot = get_robot() self._wx_wxid = self._robot.wcf.get_self_wxid() self._execute_task_info = None def execute_task(self): connection = init_new_db_connection() try: while sop_task_event.is_set(): # 查询最早的一条待执行或正在执行的任务 cursor = connection.cursor() try: sop_nodes = sop_node_get_list_by_condition(cursor) for node in sop_nodes: start_time = time.time() if node['no_reply_unit'] == 'W': start_time = time.time() - 60 * 24 * 7 * 60 - 60 * 2 elif node['no_reply_unit'] == 'D': start_time = time.time() - 60 * 24 * 60 - 60 * 2 elif node['no_reply_unit'] == 'h': start_time = time.time() - 60 * 60 - 60 * 2 end_time = start_time + 60 * 2 if node['parent_id'] == 0: message_records = get_message_by_condition(cursor, 3, 3, node['stage_id'], 0, start_time, end_time) else: message_records = get_message_by_condition(cursor, 3, 4, node['parent_id'], 0, start_time, end_time) for s in message_records: latest = get_message_by_contact_new_one(cursor, s['contact_id'], 3) if latest['id'] == s['id']: # 处理 action_message if node['action_message'] is not None: action_messages = json.loads(s['action_message']) for idx, ac in enumerate(action_messages): if ac['meta'] is not None: meta = json.dumps({ "wxid": s['bot_wxid'], "filepath": ac['content'], "diyfilename": ac['filename'] }) _ = create_message_record(cursor, 1, s['bot_wxid'], s['contact_id'], s['contact_type'], s['contact_wxid'], ac['content_type'], ac['content'], meta, 4, node['id'], idx, s['organization_id']) else: _ = create_message_record(cursor, 1, s['bot_wxid'], s['contact_id'], s['contact_type'], s['contact_wxid'], 1, '', {}, 4, node['id'], 0, s['organization_id']) # 处理 action_forward if node['action_forward'] is not None: action_forward = json.loads(node['action_forward']) if action_forward['wxid'] != "": forward_wxids = self.split_string(action_forward['wxid']) for forward_wxid in forward_wxids: for idx, message in enumerate(action_forward['action']): meta = json.dumps({ "wxid": s['bot_wxid'], "filepath": message['content'], "diyfilename": message['filename'] }) _ = create_message_record(cursor, 1, s['bot_wxid'], 0, 0, forward_wxid, message['type'], message['content'], meta, 4, node['id'], s['contact_id'] + idx, s['organization_id']) except Exception as e: print(f"发生错误: {e}") finally: pass connection.commit() except Exception as e: # 回滚事务 connection.rollback() print(f"发生错误: {e}") finally: # 确保资源被正确释放 cursor.close() # 解析输入内容 def split_string(input_string): # 定义正则表达式,匹配中文逗号、英文逗号和顿号 pattern = r'[,,、]' # 使用re.split方法分割字符串 result = re.split(pattern, input_string) return result def start_sop_task(): global sop_task_event print("Start SOP task.") # 设置事件为 True sop_task_event.set() threading.Thread(target=SopTaskService().execute_task).start() def stop_sop_task(): global sop_task_event # 清除事件 sop_task_event.clear() print("Stop SOP task.") # 从PC接口获取 sop_task 列表 def get_sop_task_list(): url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_task/list" token = str(conf().get("token")) payload = json.dumps({ 'page': 1, 'pageSize': 1000, }) headers = { 'Content-Type': 'application/json', 'Authorization': token, } response = requests.request("POST", url, headers=headers, data=payload) if response.status_code == 200: resp = json.loads(response.text) if resp is not None and 'code' in resp and resp['code'] == 0: if 'total' in resp['data'] and resp['data']['total'] > 0: return resp['data']['data'] return None # 从PC接口获取 sop_stage 列表 def get_sop_stage_list(sop_task_ids): url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_stage/list" token = str(conf().get("token")) payload = json.dumps({ 'taskIds': sop_task_ids, }) headers = { 'Content-Type': 'application/json', 'Authorization': token, } response = requests.request("POST", url, headers=headers, data=payload) print(f"get_sop_stage_list = {response.text}") if response.status_code == 200: resp = json.loads(response.text) if resp is not None and 'code' in resp and resp['code'] == 0: if 'total' in resp['data'] and resp['data']['total'] > 0: return resp['data']['data'] return None def get_sop_node_list(sop_stage_ids): url = "https://wxadminapi.gkscrm.com/wechat-api/api/sop_node/list" token = str(conf().get("token")) payload = json.dumps({ 'stageIds': sop_stage_ids, }) headers = { 'Content-Type': 'application/json', 'Authorization': token, } response = requests.request("POST", url, headers=headers, data=payload) # print(f"get_sop_node_list = {response.text}") if response.status_code == 200: resp = json.loads(response.text) # print(f"len={len(resp['data'])}, resp={resp}") if resp is not None and 'code' in resp and resp['code'] == 0: if 'data' in resp and len(resp['data']) > 0: return resp['data'] return None