import sys from typing import Annotated from urllib.parse import urlparse, quote from uuid import uuid4 from fastapi import Depends from sqlalchemy import URL from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from common.log import log from common.model import MappedBase from core.conf import settings def create_engine_and_session(url: str | URL): try: # 数据库引擎 engine = create_async_engine(url, echo=settings.MYSQL_ECHO, future=True, pool_pre_ping=True) # log.success('数据库连接成功') except Exception as e: log.error('数据库链接失败 {}', e) sys.exit() else: db_session = async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False) return engine, db_session SQLALCHEMY_DATABASE_URL = ( f'mysql+asyncmy://{settings.MYSQL_USER}:{quote(settings.MYSQL_PASSWORD)}@{settings.MYSQL_HOST}:' f'{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}?charset={settings.MYSQL_CHARSET}' ) async_engine, async_db_session = create_engine_and_session(SQLALCHEMY_DATABASE_URL) async def get_db() -> AsyncSession: """session 生成器""" session = async_db_session() try: yield session except Exception as se: await session.rollback() raise se finally: await session.close() # Session Annotated CurrentSession = Annotated[AsyncSession, Depends(get_db)] async def create_table(): """创建数据库表""" async with async_engine.begin() as coon: await coon.run_sync(MappedBase.metadata.create_all) def uuid4_str() -> str: """数据库引擎 UUID 类型兼容性解决方案""" return str(uuid4())