msg_history.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. import json
  2. from sqlite3 import Cursor
  3. from typing import TypedDict, Optional
  4. class MsgHistoryModel(TypedDict):
  5. wx_wxid: str
  6. sender: str
  7. roomid: str
  8. messages: str
  9. def msg_history_get(cur: Cursor, wx_wxid: str, sender: Optional[str], roomid: Optional[str]) -> list:
  10. # 查询历史消息
  11. fields = ["wx_wxid"]
  12. params = [wx_wxid]
  13. if roomid:
  14. fields.append("roomid")
  15. params.append(roomid)
  16. if sender:
  17. fields.append("sender")
  18. params.append(sender)
  19. query = f"SELECT * FROM msg_history WHERE {' AND '.join([f'{field} = ?' for field in fields])} LIMIT 1"
  20. cur.execute(query, params)
  21. result = cur.fetchone()
  22. if result is None:
  23. fields = ["wx_wxid", "messages"]
  24. values = [wx_wxid, "[]"]
  25. if roomid is not None:
  26. fields.append("roomid")
  27. values.append(roomid)
  28. if sender is not None:
  29. fields.append("sender")
  30. values.append(sender)
  31. operation = f"INSERT INTO msg_history ({', '.join(fields)}) VALUES ({', '.join(['?' for _ in fields])})"
  32. cur.execute(operation, values)
  33. content = []
  34. else:
  35. content = json.loads(result[3])
  36. return content
  37. def msg_history_update(cur: Cursor, wx_wxid: str, history_messages: list, sender: Optional[str],
  38. roomid: Optional[str]):
  39. messages_str = json.dumps(history_messages[-5:])
  40. fields = ["wx_wxid"]
  41. params = [messages_str, wx_wxid]
  42. if roomid:
  43. fields.append("roomid")
  44. params.append(roomid)
  45. if sender:
  46. fields.append("sender")
  47. params.append(sender)
  48. update_query = f"UPDATE msg_history SET messages = ? WHERE {' AND '.join([f'{field} = ?' for field in fields])}"
  49. cur.execute(update_query, params)