from datetime import datetime from sqlite3 import Cursor from typing import Optional, TypedDict batch_task_status = { 1: "等待中", 2: "已开始", 3: "已停止", 4: "已完成" } class BatchTaskModel(TypedDict): id: int created_at: int status: int # 1 等待中 2 已开始 3 已停止 4 已完成 wx_wxid: str content: str total: int success: int fail: int def batch_task_get_list(cur: Cursor, wx_wxid: str) -> list[BatchTaskModel]: query = "SELECT * FROM batch_task WHERE wx_wxid = ? ORDER BY created_at DESC" cur.execute(query, (wx_wxid,)) results = cur.fetchall() tasks: list[BatchTaskModel] = [] for result in results: task: BatchTaskModel = { "id": result[0], "created_at": result[1], "status": result[2], "wx_wxid": result[3], "content": result[4], "total": result[5], "success": result[6], "fail": result[7] } tasks.append(task) return tasks def batch_task_get_earliest_task_to_execute(cur: Cursor, wx_wxid: str) -> Optional[BatchTaskModel]: query = "SELECT * FROM batch_task WHERE wx_wxid = ? AND (status = 1 OR status = 2) ORDER BY created_at ASC LIMIT 1" cur.execute(query, (wx_wxid,)) result = cur.fetchone() if result is None: return None task: BatchTaskModel = { "id": result[0], "created_at": result[1], "status": result[2], "wx_wxid": result[3], "content": result[4], "total": result[5], "success": result[6], "fail": result[7] } return task def batch_task_create(cur: Cursor, wx_wxid: str, content: str, total: int) -> int: created_at = int(datetime.now().timestamp()) insert_query = "INSERT INTO batch_task (created_at, wx_wxid, content, total) VALUES (?, ?, ?, ?)" data_to_insert = (created_at, wx_wxid, content, total) cur.execute(insert_query, data_to_insert) return cur.lastrowid def batch_task_update_status(cur: Cursor, task_id: int, status: int): update_query = "UPDATE batch_task SET status = ? WHERE id = ?" data_to_update = (status, task_id) cur.execute(update_query, data_to_update) def batch_task_update_success(cur: Cursor, task_id: int, success: int): update_query = "UPDATE batch_task SET success = ? WHERE id = ?" data_to_update = (success, task_id) cur.execute(update_query, data_to_update) def batch_task_update_fail(cur: Cursor, task_id: int, fail: int): update_query = "UPDATE batch_task SET fail = ? WHERE id = ?" data_to_update = (fail, task_id) cur.execute(update_query, data_to_update)