1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- from datetime import datetime
- from sqlite3 import Cursor
- from typing import Optional, TypedDict
- batch_task_status = {
- 1: "等待中",
- 2: "已开始",
- 3: "已停止",
- 4: "已完成"
- }
- class BatchTaskModel(TypedDict):
- id: int
- created_at: int
- status: int # 1 等待中 2 已开始 3 已停止 4 已完成
- wx_wxid: str
- content: str
- total: int
- success: int
- fail: int
- def batch_task_get_list(cur: Cursor, wx_wxid: str) -> list[BatchTaskModel]:
- query = "SELECT * FROM batch_task WHERE wx_wxid = ? ORDER BY created_at DESC"
- cur.execute(query, (wx_wxid,))
- results = cur.fetchall()
- tasks: list[BatchTaskModel] = []
- for result in results:
- task: BatchTaskModel = {
- "id": result[0],
- "created_at": result[1],
- "status": result[2],
- "wx_wxid": result[3],
- "content": result[4],
- "total": result[5],
- "success": result[6],
- "fail": result[7]
- }
- tasks.append(task)
- return tasks
- def batch_task_get_earliest_task_to_execute(cur: Cursor, wx_wxid: str) -> Optional[BatchTaskModel]:
- query = "SELECT * FROM batch_task WHERE wx_wxid = ? AND (status = 1 OR status = 2) ORDER BY created_at ASC LIMIT 1"
- cur.execute(query, (wx_wxid,))
- result = cur.fetchone()
- if result is None:
- return None
- task: BatchTaskModel = {
- "id": result[0],
- "created_at": result[1],
- "status": result[2],
- "wx_wxid": result[3],
- "content": result[4],
- "total": result[5],
- "success": result[6],
- "fail": result[7]
- }
- return task
- def batch_task_create(cur: Cursor, wx_wxid: str, content: str, total: int) -> int:
- created_at = int(datetime.now().timestamp())
- insert_query = "INSERT INTO batch_task (created_at, wx_wxid, content, total) VALUES (?, ?, ?, ?)"
- data_to_insert = (created_at, wx_wxid, content, total)
- cur.execute(insert_query, data_to_insert)
- return cur.lastrowid
- def batch_task_update_status(cur: Cursor, task_id: int, status: int):
- update_query = "UPDATE batch_task SET status = ? WHERE id = ?"
- data_to_update = (status, task_id)
- cur.execute(update_query, data_to_update)
- def batch_task_update_success(cur: Cursor, task_id: int, success: int):
- update_query = "UPDATE batch_task SET success = ? WHERE id = ?"
- data_to_update = (success, task_id)
- cur.execute(update_query, data_to_update)
- def batch_task_update_fail(cur: Cursor, task_id: int, fail: int):
- update_query = "UPDATE batch_task SET fail = ? WHERE id = ?"
- data_to_update = (fail, task_id)
- cur.execute(update_query, data_to_update)
|