import asyncio import threading from app.admin.crud.crud_intent_org import intent_org_dao from app.admin.schema.intent_org import CurrentIntentOrgIns from batch_task.update_llm_intent import update_llm_intent, process_llm_intent from batch_task.update_mismatch_record import update_mismatch_record, process_mismatch_record from common.log import log from core.conf import settings from database.db_mysql import async_db_session from utils.serializers import select_as_dict batch_task_event = threading.Event() async def periodically_execute(interval, func, *args, **kwargs): while True: await func(*args, **kwargs) await asyncio.sleep(interval) async def execute_task(): org_map = {} async with async_db_session.begin() as db: orgs = await intent_org_dao.get_all(db) if orgs: for org in orgs: if org.status == 1: org_map[org.id] = CurrentIntentOrgIns(**select_as_dict(org)) batch_task_event.set() workers = settings.BATCH_CONCURRENT log.info(f"Starting task with {workers} workers.") # average_workers = int(round(workers / 2)) while batch_task_event.is_set(): len_llm_intent = await update_llm_intent(org_map, workers) len_mismatch_record = await update_mismatch_record(org_map, workers) if len_llm_intent or len_mismatch_record: log.info("Task executed") else: log.info("Task executed. Sleeping for 10 seconds...") await asyncio.sleep(10) def start_batch_task(): global batch_task_event if not batch_task_event.is_set(): log.info("Starting task.") batch_task_event.set() asyncio.run(execute_task()) # asyncio.create_task(execute_task()) def stop_batch_task(): global batch_task_event # 清除事件 batch_task_event.clear() log.info("Stop task.")