12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- import json
- from sqlite3 import Cursor
- from typing import TypedDict, Optional
- class MsgHistoryModel(TypedDict):
- wx_wxid: str
- sender: str
- roomid: str
- messages: str
- def msg_history_get(cur: Cursor, wx_wxid: str, sender: Optional[str], roomid: Optional[str]) -> list:
- # 查询历史消息
- fields = ["wx_wxid"]
- params = [wx_wxid]
- if roomid:
- fields.append("roomid")
- params.append(roomid)
- if sender:
- fields.append("sender")
- params.append(sender)
- query = f"SELECT * FROM msg_history WHERE {' AND '.join([f'{field} = ?' for field in fields])} LIMIT 1"
- cur.execute(query, params)
- result = cur.fetchone()
- if result is None:
- fields = ["wx_wxid", "messages"]
- values = [wx_wxid, "[]"]
- if roomid is not None:
- fields.append("roomid")
- values.append(roomid)
- if sender is not None:
- fields.append("sender")
- values.append(sender)
- operation = f"INSERT INTO msg_history ({', '.join(fields)}) VALUES ({', '.join(['?' for _ in fields])})"
- cur.execute(operation, values)
- content = []
- else:
- content = json.loads(result[3])
- return content
- def msg_history_update(cur: Cursor, wx_wxid: str, history_messages: list, sender: Optional[str],
- roomid: Optional[str]):
- messages_str = json.dumps(history_messages[-5:])
- fields = ["wx_wxid"]
- params = [messages_str, wx_wxid]
- if roomid:
- fields.append("roomid")
- params.append(roomid)
- if sender:
- fields.append("sender")
- params.append(sender)
- update_query = f"UPDATE msg_history SET messages = ? WHERE {' AND '.join([f'{field} = ?' for field in fields])}"
- cur.execute(update_query, params)
|