123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import threading
- import time
- from common.sql_lite import init_new_db_connection
- from db.batch_msg import batch_msg_get_earliest_msg_to_execute, batch_msg_update_status
- from db.batch_task import batch_task_get_earliest_task_to_execute, batch_task_update_status, batch_task_update_success, \
- batch_task_update_fail
- from service.robot import get_robot
- batch_task_event = threading.Event()
- class BatchTaskService:
- def __init__(self):
- self._robot = get_robot()
- self._wx_wxid = self._robot.wcf.get_self_wxid()
- self._execute_task_info = None
- def execute_task(self):
- connection = init_new_db_connection()
- try:
- while batch_task_event.is_set():
- # 查询最早的一条待执行或正在执行的任务
- cursor = connection.cursor()
- try:
- task_result = batch_task_get_earliest_task_to_execute(cursor, self._wx_wxid)
- if task_result is not None:
- if task_result["status"] == 1:
- # 更新任务状态为正在执行
- batch_task_update_status(cursor, task_result["id"], 2)
- # 获取一条待发送消息
- msg_result = batch_msg_get_earliest_msg_to_execute(cursor, task_result["id"])
- if msg_result is None:
- # 如果已没有待发送消息,则将任务状态改为已完成
- batch_task_update_status(cursor, task_result["id"], 4)
- else:
- # 如果有待发送消息,则发送
- send_result = self._robot.sendTextMsg(task_result["content"], msg_result["wxid"])
- if send_result == 0:
- batch_task_update_success(cursor, task_result["id"], task_result["success"]+1)
- batch_msg_update_status(cursor, msg_result["id"], 2)
- else:
- batch_task_update_fail(cursor, task_result["id"], task_result["fail"]+1)
- batch_msg_update_status(cursor, msg_result["id"], 3)
- connection.commit()
- except Exception as e:
- # 回滚事务
- connection.rollback()
- print(f"发生错误: {e}")
- finally:
- # 确保资源被正确释放
- cursor.close()
- time.sleep(3)
- finally:
- connection.close()
- def start_batch_task():
- global batch_task_event
- print("Start task.")
- # 设置事件为 True
- batch_task_event.set()
- threading.Thread(target=BatchTaskService().execute_task).start()
- def stop_batch_task():
- global batch_task_event
- # 清除事件
- batch_task_event.clear()
|