batch_msg.py 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from datetime import datetime
  2. from sqlite3 import Cursor
  3. from typing import TypedDict, Optional
  4. batch_msg_status = {
  5. 1: "待发送",
  6. 2: "成功",
  7. 3: "失败",
  8. }
  9. batch_msg_type = {
  10. 1: "联系人",
  11. 2: "群组",
  12. }
  13. class BatchMsgModel(TypedDict):
  14. id: int
  15. batch_task_id: int
  16. created_at: int
  17. updated_at: int
  18. status: int # 1 待发送 2 成功 3 失败
  19. type: int # 1 联系人 2 群组
  20. wx_wxid: str
  21. wxid: str
  22. nickname: str
  23. def batch_msg_get_list(cur: Cursor, task_id: int) -> list[BatchMsgModel]:
  24. query = "SELECT * FROM batch_msg WHERE batch_task_id = ?"
  25. cur.execute(query, (task_id,))
  26. results = cur.fetchall()
  27. msgs: list[BatchMsgModel] = []
  28. for result in results:
  29. msg: BatchMsgModel = {
  30. "id": result[0],
  31. "batch_task_id": result[1],
  32. "created_at": result[2],
  33. "updated_at": result[3],
  34. "status": result[4],
  35. "type": result[5],
  36. "wx_wxid": result[6],
  37. "wxid": result[7],
  38. "nickname": result[8]
  39. }
  40. msgs.append(msg)
  41. return msgs
  42. def batch_msg_get_earliest_msg_to_execute(cur: Cursor, task_id: int) -> Optional[BatchMsgModel]:
  43. query = "SELECT * FROM batch_msg WHERE batch_task_id = ? AND (status = 1 OR status = 3) ORDER BY created_at ASC LIMIT 1"
  44. cur.execute(query, (task_id,))
  45. result = cur.fetchone()
  46. if result is None:
  47. return None
  48. task: BatchMsgModel = {
  49. "id": result[0],
  50. "batch_task_id": result[1],
  51. "created_at": result[2],
  52. "updated_at": result[3],
  53. "status": result[4],
  54. "type": result[5],
  55. "wx_wxid": result[6],
  56. "wxid": result[7],
  57. "nickname": result[8]
  58. }
  59. return task
  60. def batch_msg_create_many(cur: Cursor, msg_list: list[tuple]):
  61. cur.executemany(
  62. 'INSERT INTO batch_msg (batch_task_id, created_at, type, wx_wxid, wxid, nickname) VALUES (?,?,?,?,?,?)',
  63. msg_list)
  64. def batch_msg_update_status(cur: Cursor, msg_id: int, status: int):
  65. update_query = "UPDATE batch_msg SET status = ? WHERE id = ?"
  66. data_to_update = (status, msg_id)
  67. cur.execute(update_query, data_to_update)