from datetime import datetime from sqlite3 import Cursor from typing import TypedDict, Optional batch_msg_status = { 1: "待发送", 2: "成功", 3: "失败", } batch_msg_type = { 1: "联系人", 2: "群组", } class BatchMsgModel(TypedDict): id: int batch_task_id: int created_at: int updated_at: int status: int # 1 待发送 2 成功 3 失败 type: int # 1 联系人 2 群组 wx_wxid: str wxid: str nickname: str def batch_msg_get_list(cur: Cursor, task_id: int) -> list[BatchMsgModel]: query = "SELECT * FROM batch_msg WHERE batch_task_id = ?" cur.execute(query, (task_id,)) results = cur.fetchall() msgs: list[BatchMsgModel] = [] for result in results: msg: BatchMsgModel = { "id": result[0], "batch_task_id": result[1], "created_at": result[2], "updated_at": result[3], "status": result[4], "type": result[5], "wx_wxid": result[6], "wxid": result[7], "nickname": result[8] } msgs.append(msg) return msgs def batch_msg_get_earliest_msg_to_execute(cur: Cursor, task_id: int) -> Optional[BatchMsgModel]: query = "SELECT * FROM batch_msg WHERE batch_task_id = ? AND (status = 1 OR status = 3) ORDER BY created_at ASC LIMIT 1" cur.execute(query, (task_id,)) result = cur.fetchone() if result is None: return None task: BatchMsgModel = { "id": result[0], "batch_task_id": result[1], "created_at": result[2], "updated_at": result[3], "status": result[4], "type": result[5], "wx_wxid": result[6], "wxid": result[7], "nickname": result[8] } return task def batch_msg_create_many(cur: Cursor, msg_list: list[tuple]): cur.executemany( 'INSERT INTO batch_msg (batch_task_id, created_at, type, wx_wxid, wxid, nickname) VALUES (?,?,?,?,?,?)', msg_list) def batch_msg_update_status(cur: Cursor, msg_id: int, status: int): update_query = "UPDATE batch_msg SET status = ? WHERE id = ?" data_to_update = (status, msg_id) cur.execute(update_query, data_to_update)