batch_task.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. from datetime import datetime
  2. from sqlite3 import Cursor
  3. from typing import Optional, TypedDict
  4. batch_task_status = {
  5. 1: "等待中",
  6. 2: "已开始",
  7. 3: "已停止",
  8. 4: "已完成"
  9. }
  10. class BatchTaskModel(TypedDict):
  11. id: int
  12. created_at: int
  13. status: int # 1 等待中 2 已开始 3 已停止 4 已完成
  14. wx_wxid: str
  15. content: str
  16. total: int
  17. success: int
  18. fail: int
  19. def batch_task_get_list(cur: Cursor, wx_wxid: str) -> list[BatchTaskModel]:
  20. query = "SELECT * FROM batch_task WHERE wx_wxid = ? ORDER BY created_at DESC"
  21. cur.execute(query, (wx_wxid,))
  22. results = cur.fetchall()
  23. tasks: list[BatchTaskModel] = []
  24. for result in results:
  25. task: BatchTaskModel = {
  26. "id": result[0],
  27. "created_at": result[1],
  28. "status": result[2],
  29. "wx_wxid": result[3],
  30. "content": result[4],
  31. "total": result[5],
  32. "success": result[6],
  33. "fail": result[7]
  34. }
  35. tasks.append(task)
  36. return tasks
  37. def batch_task_get_earliest_task_to_execute(cur: Cursor, wx_wxid: str) -> Optional[BatchTaskModel]:
  38. query = "SELECT * FROM batch_task WHERE wx_wxid = ? AND (status = 1 OR status = 2) ORDER BY created_at ASC LIMIT 1"
  39. cur.execute(query, (wx_wxid,))
  40. result = cur.fetchone()
  41. if result is None:
  42. return None
  43. task: BatchTaskModel = {
  44. "id": result[0],
  45. "created_at": result[1],
  46. "status": result[2],
  47. "wx_wxid": result[3],
  48. "content": result[4],
  49. "total": result[5],
  50. "success": result[6],
  51. "fail": result[7]
  52. }
  53. return task
  54. def batch_task_create(cur: Cursor, wx_wxid: str, content: str, total: int) -> int:
  55. created_at = int(datetime.now().timestamp())
  56. insert_query = "INSERT INTO batch_task (created_at, wx_wxid, content, total) VALUES (?, ?, ?, ?)"
  57. data_to_insert = (created_at, wx_wxid, content, total)
  58. cur.execute(insert_query, data_to_insert)
  59. return cur.lastrowid
  60. def batch_task_update_status(cur: Cursor, task_id: int, status: int):
  61. update_query = "UPDATE batch_task SET status = ? WHERE id = ?"
  62. data_to_update = (status, task_id)
  63. cur.execute(update_query, data_to_update)
  64. def batch_task_update_success(cur: Cursor, task_id: int, success: int):
  65. update_query = "UPDATE batch_task SET success = ? WHERE id = ?"
  66. data_to_update = (success, task_id)
  67. cur.execute(update_query, data_to_update)
  68. def batch_task_update_fail(cur: Cursor, task_id: int, fail: int):
  69. update_query = "UPDATE batch_task SET fail = ? WHERE id = ?"
  70. data_to_update = (fail, task_id)
  71. cur.execute(update_query, data_to_update)