123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from service.robot import get_robot
- from common.sql_lite import init_new_db_connection
- from sqlite3 import Cursor
- import string
- # 根据 wx_id 获取昵称
- def get_contact_by_wxid(wx_id):
- # 获取全部联系人和标签列表
- robot = get_robot()
- label_list = robot.getContactLabelList()
- for label in label_list:
- if wx_id == label['LabelId']:
- return {
- "nickname": label['LabelName'],
- "wxid": wx_id,
- }
- return None
- # 追加联系人标签
- def add_contact_label(contact_id: str, action_label_add: list[int], action_label_del: list[int],
- action_label_add_list, action_label_del_list):
- """
- action_label_add [1,2,3]
- action_label_add_list ["命中任务1", "命中任务2", "命中任务3"]
- """
- return []
- robot = get_robot()
- label_list = robot.getContactLabelList()
- contact_label_list = []
- for label in label_list:
- contact_label_list.append(label['LabelName'])
- add_label_list = []
- rem_label_ids = []
- final_label_ids = []
- for add_label_name in action_label_add_list:
- if add_label_name not in contact_label_list and add_label_name not in action_label_del_list:
- add_label_list.append(add_label_id)
- for contact_label_id in contact_label_list:
- if contact_label_id not in action_label_del:
- final_label_ids.append(contact_label_id)
- else:
- rem_label_ids.append(contact_label_id)
- contacts = robot.getContactList()
- for label_id in add_label_list:
- # 新建label
- new_label = robot.insertLabel(label_id)
- # print(f"new_label={new_label}")
- # 把用于与label的关系建立起来
- for contact in contacts:
- if contact['UserName'] == contact_id:
- label_id_list = label_id_add_format(new_label['LabelId'], contact['LabelIDList'])
- self.wcf.query_sql("MicroMsg.db",
- f"update Contact set LabelIDList={label_id_list} where UserName={contact['UserName']}")
- for label_id in rem_label_ids:
- for contact in contacts:
- if contact['UserName'] == contact_id:
- label_id_list = label_id_remove_format(label_id, contact['LabelIDList'])
- self.wcf.query_sql("MicroMsg.db",
- f"update Contact set LabelIDList={label_id_list} where UserName={contact['UserName']}")
- return final_label_ids
- # 检查 label_id 是否在 label_id_list (形如:1,2,3) 这样的内容里
- def label_id_add_format(label_id, label_id_list):
- if label_id_list == '':
- return f",{label_id},"
- # 去掉前后的,
- label_id_list = str_trim_comma(label_id_list)
- label_id_set = label_id_list.split(',')
- if label_id in label_id_set:
- return label_id_list
- else:
- label_id_set.append(label_id)
- return label_id_set.join(',')
- # 检查 label_id 是否在 label_id_list (形如:1,2,3) 这样的内容里
- def label_id_remove_format(label_id, label_id_list):
- if label_id_list == '':
- return ""
- # 去掉前后的,
- label_id_list = str_trim_comma(label_id_list)
- label_id_set = label_id_list.split(',')
- if label_id not in label_id_set:
- return label_id_list
- else:
- label_id_set.remove(label_id)
- return label_id_set.join(',')
- # 去掉前后的,
- def str_trim_comma(label_id_list):
- if label_id_list.startswith(','):
- label_id_list = label_id_list[1:]
- if label_id_list.endswith(','):
- label_id_list = label_id_list[:-1]
- return label_id_list
|