import threading import time from common.sql_lite import init_new_db_connection from db.batch_msg import batch_msg_get_earliest_msg_to_execute, batch_msg_update_status from db.batch_task import batch_task_get_earliest_task_to_execute, batch_task_update_status, batch_task_update_success, \ batch_task_update_fail from service.robot import get_robot batch_task_event = threading.Event() class BatchTaskService: def __init__(self): self._robot = get_robot() self._wx_wxid = self._robot.wcf.get_self_wxid() self._execute_task_info = None def execute_task(self): connection = init_new_db_connection() try: while batch_task_event.is_set(): # 查询最早的一条待执行或正在执行的任务 cursor = connection.cursor() try: task_result = batch_task_get_earliest_task_to_execute(cursor, self._wx_wxid) if task_result is not None: if task_result["status"] == 1: # 更新任务状态为正在执行 batch_task_update_status(cursor, task_result["id"], 2) # 获取一条待发送消息 msg_result = batch_msg_get_earliest_msg_to_execute(cursor, task_result["id"]) if msg_result is None: # 如果已没有待发送消息,则将任务状态改为已完成 batch_task_update_status(cursor, task_result["id"], 4) else: # 如果有待发送消息,则发送 send_result = self._robot.sendTextMsg(task_result["content"], msg_result["wxid"]) if send_result == 0: batch_task_update_success(cursor, task_result["id"], task_result["success"]+1) batch_msg_update_status(cursor, msg_result["id"], 2) else: batch_task_update_fail(cursor, task_result["id"], task_result["fail"]+1) batch_msg_update_status(cursor, msg_result["id"], 3) connection.commit() except Exception as e: # 回滚事务 connection.rollback() print(f"发生错误: {e}") finally: # 确保资源被正确释放 cursor.close() time.sleep(3) finally: connection.close() def start_batch_task(): global batch_task_event print("Start task.") # 设置事件为 True batch_task_event.set() threading.Thread(target=BatchTaskService().execute_task).start() def stop_batch_task(): global batch_task_event # 清除事件 batch_task_event.clear()