123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261 |
- import json
- from sqlite3 import Cursor
- from typing import TypedDict, Optional
- from config import conf
- """
- sop_task 相关Model和CURD方法
- """
- sop_task_status = {
- 1: "草稿",
- 2: "未开始",
- 3: "已开始",
- 4: "已结束",
- 5: "已禁用",
- }
- #标签类型:1好友,2群组,3企业微信联系人
- sop_task_type = {
- 1: "好友",
- 2: "群组",
- 3: "企业微信联系人",
- }
- class SopTaskModel(TypedDict):
- id: int
- sop_task_id: int
- created_at: int
- updated_at: int
- name: str
- bot_wxid_list: json
- status: int # 状态 1-草稿 2-未开始 3-已开始 4-已结束 5-禁用
- type: int # 标签类型:1-好友,2-群组,3-企业微信联系人
- wx_wxid: str
- token: str
- # 获取 sop_task 列表
- def sop_task_get_list(cur: Cursor, wx_wxid: str) -> list[SopTaskModel]:
- query = "SELECT * FROM sop_task WHERE wx_wxid=? and (status=? OR status=?) ORDER BY created_at DESC"
- cur.execute(query, (wx_wxid, 3, 5))
- results = cur.fetchall()
- msgs: list[SopTaskModel] = []
- for result in results:
- msg: SopTaskModel = {
- "id": result[0],
- "sop_task_id": result[1],
- "created_at": result[2],
- "updated_at": result[3],
- "name": result[4],
- "bot_wxid_list": result[5],
- "status": result[6],
- "type": result[7],
- "wx_wxid": result[8],
- "token": result[9],
- }
- msgs.append(msg)
- return msgs
- # 获取 sop_task 列表
- def sop_task_query(cur: Cursor, task_id: int) -> Optional[SopTaskModel]:
- query = "SELECT * FROM sop_task WHERE sop_task_id = ? ORDER BY created_at DESC LIMIT 1"
- cur.execute(query, (task_id,))
- result = cur.fetchone()
- # print(f"sop_task_query: {result}")
- if result is None:
- return None
- task: SopTaskModel = {
- "id": result[0],
- "sop_task_id": result[1],
- "created_at": result[2],
- "updated_at": result[3],
- "name": result[4],
- "bot_wxid_list": result[5],
- "status": result[6],
- "type": result[7],
- "wx_wxid": result[8],
- "token": result[9],
- }
- return task
- # 创建 sop_task 记录
- def sop_task_create(cur: Cursor, msg, wx_wxid):
- botWxidList = json.dumps(msg['botWxidList'], ensure_ascii=False)
- createdAt = int(msg['createdAt'])/1000
- updatedAt = int(msg['updatedAt'])/1000
- token = str(conf().get("token"))
- insert_query = "INSERT INTO sop_task (sop_task_id, created_at, updated_at, name, bot_wxid_list, status, type, wx_wxid, token) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
- insert_data = (msg['id'], createdAt, updatedAt, msg['name'], botWxidList, msg['status'], msg['type'], wx_wxid, token)
- cur.execute(insert_query, insert_data)
- return cur.lastrowid
- # 更新 sop_task 状态
- def sop_task_update_status(cur: Cursor, id: int, status: int):
- update_query = "UPDATE sop_task SET status = ? WHERE id = ?"
- update_data = (status, id)
- cur.execute(update_query, update_data)
- """
- sop_stage 相关Model和CURD方法
- """
- class SopStageModel(TypedDict):
- id: int
- created_at: int
- updated_at: int
- name: str
- status: int # 状态 1-正常 2-禁用
- task_id: int
- condition_type: int
- condition_operator: int
- condition_list: json
- action_message: json
- action_label_add: json
- action_label_del: json
- action_forward: json
- index_sort: int
- # 批量创建多个 sop_stage
- def sop_stage_create_many(cur: Cursor, msg_list: list[dict]):
- for msg in msg_list:
- conditionList = json.dumps(msg['conditionList'], ensure_ascii=False)
- actionMessage = json.dumps(msg['actionMessage'], ensure_ascii=False)
- actionLabelAdd = json.dumps(msg['actionLabelAdd'], ensure_ascii=False)
- actionLabelDel = json.dumps(msg['actionLabelDel'], ensure_ascii=False)
- actionForward = json.dumps(msg['actionForward'], ensure_ascii=False)
- insert_sql = 'INSERT INTO sop_stage (created_at, updated_at, name, status, task_id, condition_type, condition_operator, condition_list, action_message, action_label_add, action_label_del, action_forward, index_sort) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)'
- insert_data = (int(msg['createdAt'])/1000, int(msg['updatedAt'])/1000, msg['name'],
- msg['status'], msg['taskId'], msg['conditionType'], msg['conditionOperator'],
- conditionList, actionMessage, actionLabelAdd, actionLabelDel,
- actionForward, msg['indexSort']
- )
- cur.execute(insert_sql, insert_data)
- # 获取阶段记录
- def get_stage(cursor: Cursor, organization_id: int):
- task_query = "SELECT * FROM sop_task WHERE status = 3 AND organization_id = ?"
- task_tuple = (organization_id,)
- cursor.execute(task_query, task_tuple)
- tasks = cursor.fetchall()
- # 遍历tasks,查询每个task下的stage
- stages = []
- for task in tasks:
- sql_query = "SELECT * FROM sop_stage WHERE task_id = ? AND status = 1"
- stage_tuple = (task['id'],)
- cursor.execute(sql_query, stage_tuple)
- stages += cursor.fetchall()
- return stages
- """
- sop_node 相关Model和CURD方法
- """
- class SopNodeModel(TypedDict):
- id: int
- created_at: int
- updated_at: int
- name: str
- stage_id: int
- parent_id: int
- status: int # 状态 1-正常 2-禁用
- condition_type: int
- condition_list: json
- no_reply_condition: int
- no_reply_unit: str
- action_message: json
- action_label_add: json
- action_label_del: json
- action_forward: json
- action_label_add_list: json
- action_label_del_list: json
- # 创建多个 sop_node
- def sop_node_create_many(cur: Cursor, msg_list: list[dict]):
- for msg in msg_list:
- conditionList = json.dumps(msg['conditionList'], ensure_ascii=False)
- actionMessage = json.dumps(msg['actionMessage'], ensure_ascii=False)
- actionLabelAdd = json.dumps(msg['actionLabelAdd'], ensure_ascii=False)
- actionLabelDel = json.dumps(msg['actionLabelDel'], ensure_ascii=False)
- actionForward = json.dumps(msg['actionForward'], ensure_ascii=False)
- actionLabelAddList = None if msg['actionLabelAddList'] is None else json.dumps(msg['actionLabelAddList'], ensure_ascii=False)
- actionLabelDelList = None if msg['actionLabelDelList'] is None else json.dumps(msg['actionLabelDelList'], ensure_ascii=False)
- sql = 'INSERT INTO sop_node (created_at, updated_at, name, stage_id, parent_id, status, condition_type, condition_list, no_reply_condition, no_reply_unit, action_message, action_label_add, action_label_del, action_forward, action_label_add_list, action_label_del_list) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
- data = (int(msg['createdAt'])/1000, int(msg['updatedAt'])/1000, msg['name'],
- msg['stageId'], msg['parentId'], msg['status'], msg['conditionType'],
- conditionList, msg['noReplyCondition'], msg['noReplyUnit'],
- actionMessage, actionLabelAdd, actionLabelDel, actionForward,
- actionLabelAddList, actionLabelDelList
- )
- cur.execute(sql, data)
- # 获取 sop_node 列表
- def sop_node_get_list_by_parent(cur: Cursor, parent_id: int, stage_id: Optional[int]) -> list[SopNodeModel]:
- if stage_id is not None:
- sql_query = "SELECT * FROM sop_node WHERE parent_id = ? AND stage_id = ?"
- cur.execute(sql_query, (parent_id, stage_id))
- results = cur.fetchall()
- else:
- sql_query = "SELECT * FROM sop_node WHERE parent_id = ?"
- cur.execute(sql_query, (parent_id,))
- results = cur.fetchall()
- msgs: list[SopNodeModel] = []
- for result in results:
- msg: SopNodeModel = {
- "id": result[0],
- "created_at": result[1],
- "updated_at": result[2],
- "name": result[3],
- "stage_id": result[4],
- "parent_id": result[5],
- "status": result[6],
- "condition_type": result[7],
- "condition_list": result[8],
- "no_reply_condition": result[9],
- "no_reply_unit": result[10],
- "action_message": result[11],
- "action_label_add": result[12],
- "action_label_del": result[13],
- "action_forward": result[14],
- "action_label_add_list": result[15],
- "action_label_del_list": result[16]
- }
- msgs.append(msg)
- return msgs
- # 获取 sop_node 列表
- def sop_node_get_list_by_condition(cur: Cursor) -> list[SopNodeModel]:
- sql_query = "SELECT * FROM sop_node WHERE no_reply_condition = 0"
- cur.execute(sql_query, ())
- results = cur.fetchall()
- msgs: list[SopNodeModel] = []
- for result in results:
- msg: SopNodeModel = {
- "id": result[0],
- "created_at": result[1],
- "updated_at": result[2],
- "name": result[3],
- "stage_id": result[4],
- "parent_id": result[5],
- "status": result[6],
- "condition_type": result[7],
- "condition_list": result[8],
- "no_reply_condition": result[9],
- "no_reply_unit": result[10],
- "action_message": result[11],
- "action_label_add": result[12],
- "action_label_del": result[13],
- "action_forward": result[14],
- "action_label_add_list": result[15],
- "action_label_del_list": result[16]
- }
- msgs.append(msg)
- return msgs
|