12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import asyncio
- import threading
- from app.admin.crud.crud_intent_org import intent_org_dao
- from app.admin.schema.intent_org import CurrentIntentOrgIns
- from batch_task.update_llm_intent import update_llm_intent, process_llm_intent
- from batch_task.update_mismatch_record import update_mismatch_record, process_mismatch_record
- from common.log import log
- from core.conf import settings
- from database.db_mysql import async_db_session
- from utils.serializers import select_as_dict
- batch_task_event = threading.Event()
- async def periodically_execute(interval, func, *args, **kwargs):
- while True:
- await func(*args, **kwargs)
- await asyncio.sleep(interval)
- async def execute_task():
- org_map = {}
- async with async_db_session.begin() as db:
- orgs = await intent_org_dao.get_all(db)
- if orgs:
- for org in orgs:
- if org.status == 1:
- org_map[org.id] = CurrentIntentOrgIns(**select_as_dict(org))
- batch_task_event.set()
- workers = settings.BATCH_CONCURRENT
- log.info(f"Starting task with {workers} workers.")
- # average_workers = int(round(workers / 2))
- while batch_task_event.is_set():
- len_llm_intent = await update_llm_intent(org_map, workers)
- len_mismatch_record = await update_mismatch_record(org_map, workers)
- if len_llm_intent or len_mismatch_record:
- log.info("Task executed")
- else:
- log.info("Task executed. Sleeping for 10 seconds...")
- await asyncio.sleep(10)
- def start_batch_task():
- global batch_task_event
- if not batch_task_event.is_set():
- log.info("Starting task.")
- batch_task_event.set()
- asyncio.run(execute_task())
- # asyncio.create_task(execute_task())
- def stop_batch_task():
- global batch_task_event
- # 清除事件
- batch_task_event.clear()
- log.info("Stop task.")
|