from datetime import datetime from tkinter import messagebox from common.sql_lite import init_new_db_connection from db.batch_msg import batch_msg_create_many from db.batch_task import batch_task_create from service.robot import get_robot from ui.batch_task.ui_batch_task_create import WinGUIBatchTaskCreate class WinBatchTaskCreate(WinGUIBatchTaskCreate): def __init__(self): super().__init__() self.__event_bind() self.label_list = ["全部"] self.label_map = { "全部": 0 } self.contacts = [] def __event_bind(self): self.tk_button_send.bind('', self.send_event) self.tk_label_frame_contact.tk_select_box_type.bind('<>', self.on_select_event) self.tk_label_frame_contact.tk_select_box_label.bind('<>', self.on_select_event) self.tk_label_frame_contact.tk_table_contact_list.bind('', self.select_all_event) def on_select_event(self, event): contact_type = self.tk_label_frame_contact.tk_select_box_type.get() label = self.tk_label_frame_contact.tk_select_box_label.get() if contact_type == "联系人": self.tk_label_frame_contact.tk_select_box_label.config(state='normal') else: self.tk_label_frame_contact.tk_select_box_label.config(state='disabled') self.search(contact_type, label) def search(self, contact_type, label): # 清空列表 self.tk_label_frame_contact.tk_table_contact_list.delete(*self.tk_label_frame_contact.tk_table_contact_list.get_children()) self.contacts = [] # 查询 robot = get_robot() contacts = robot.getFriendOrChatRoomList(contact_type, self.label_map.get(label, 0)) for contact in contacts: self.contacts.append((contact['wxid'], contact_type, contact['name'])) for values in self.contacts: self.tk_label_frame_contact.tk_table_contact_list.insert('', "end", values=values) def send_event(self, event): content = self.tk_label_frame_content.tk_text_content.get(1.0, "end").strip() if content == "": messagebox.showerror('错误', '发送内容不能为空') return selected_items = self.tk_label_frame_contact.tk_table_contact_list.selection() if len(selected_items) == 0: if len(self.contacts) == 0: messagebox.showerror('错误', '请选择联系人或群组') return selected_items = self.contacts connection = init_new_db_connection() cursor = connection.cursor() try: now = datetime.now() created_at = int(now.timestamp()) robot = get_robot() wx_wxid = robot.wcf.get_self_wxid() batch_task_id = batch_task_create(cursor, wx_wxid, content, len(selected_items)) msg_to_insert = [] for item in selected_items: item_values = self.tk_label_frame_contact.tk_table_contact_list.item(item, "values") print(item_values) if item_values[1] == "联系人": msg_type = 1 else: msg_type = 2 msg_to_insert.append((batch_task_id, created_at, msg_type, wx_wxid, item_values[0], item_values[2])) # 使用executemany来插入多行数据 batch_msg_create_many(cursor, msg_to_insert) # 提交事务 connection.commit() except Exception as e: # 回滚事务 connection.rollback() print(f"发生错误: {e}") finally: # 确保资源被正确释放 cursor.close() connection.close() # 关闭当前窗口 self.destroy() def select_all_event(self, event): for item in self.tk_label_frame_contact.tk_table_contact_list.get_children(): self.tk_label_frame_contact.tk_table_contact_list.selection_add(item) def open_batch_task_create_win(): # 创建窗口对象 win_batch_task_create = WinBatchTaskCreate() # 获取全部联系人和标签列表 robot = get_robot() label_list = robot.getContactLabelList() for label in label_list: win_batch_task_create.label_map[label['LabelName']] = label['LabelId'] win_batch_task_create.label_list.append(label['LabelName']) win_batch_task_create.tk_label_frame_contact.tk_select_box_label['values'] = win_batch_task_create.label_list win_batch_task_create.tk_label_frame_contact.tk_select_box_label.set(win_batch_task_create.label_list[0]) win_batch_task_create.search(win_batch_task_create.tk_label_frame_contact.tk_select_box_type.get(), win_batch_task_create.tk_label_frame_contact.tk_select_box_label.get()) # 设置默认 # win_batch_task_create.tk_label_frame_contact.tk_select_box_type.set('联系人') # for values in users_to_insert: # win_batch_task_create.tk_label_frame_contact.tk_table_contact_list.insert('', "end", values=values) win_batch_task_create.mainloop()