batch_task.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import threading
  2. import time
  3. from common.sql_lite import init_new_db_connection
  4. from db.batch_msg import batch_msg_get_earliest_msg_to_execute, batch_msg_update_status
  5. from db.batch_task import batch_task_get_earliest_task_to_execute, batch_task_update_status, batch_task_update_success, \
  6. batch_task_update_fail
  7. from service.robot import get_robot
  8. batch_task_event = threading.Event()
  9. class BatchTaskService:
  10. def __init__(self):
  11. self._robot = get_robot()
  12. self._wx_wxid = self._robot.wcf.get_self_wxid()
  13. self._execute_task_info = None
  14. def execute_task(self):
  15. connection = init_new_db_connection()
  16. try:
  17. while batch_task_event.is_set():
  18. # 查询最早的一条待执行或正在执行的任务
  19. cursor = connection.cursor()
  20. try:
  21. task_result = batch_task_get_earliest_task_to_execute(cursor, self._wx_wxid)
  22. if task_result is not None:
  23. if task_result["status"] == 1:
  24. # 更新任务状态为正在执行
  25. batch_task_update_status(cursor, task_result["id"], 2)
  26. # 获取一条待发送消息
  27. msg_result = batch_msg_get_earliest_msg_to_execute(cursor, task_result["id"])
  28. if msg_result is None:
  29. # 如果已没有待发送消息,则将任务状态改为已完成
  30. batch_task_update_status(cursor, task_result["id"], 4)
  31. else:
  32. # 如果有待发送消息,则发送
  33. send_result = self._robot.sendTextMsg(task_result["content"], msg_result["wxid"])
  34. if send_result == 0:
  35. batch_task_update_success(cursor, task_result["id"], task_result["success"]+1)
  36. batch_msg_update_status(cursor, msg_result["id"], 2)
  37. else:
  38. batch_task_update_fail(cursor, task_result["id"], task_result["fail"]+1)
  39. batch_msg_update_status(cursor, msg_result["id"], 3)
  40. connection.commit()
  41. except Exception as e:
  42. # 回滚事务
  43. connection.rollback()
  44. print(f"发生错误: {e}")
  45. finally:
  46. # 确保资源被正确释放
  47. cursor.close()
  48. time.sleep(3)
  49. finally:
  50. connection.close()
  51. def start_batch_task():
  52. global batch_task_event
  53. print("Start task.")
  54. # 设置事件为 True
  55. batch_task_event.set()
  56. threading.Thread(target=BatchTaskService().execute_task).start()
  57. def stop_batch_task():
  58. global batch_task_event
  59. # 清除事件
  60. batch_task_event.clear()