12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- import sys
- from typing import Annotated
- from urllib.parse import urlparse, quote
- from uuid import uuid4
- from fastapi import Depends
- from sqlalchemy import URL
- from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
- from common.log import log
- from common.model import MappedBase
- from core.conf import settings
- def create_engine_and_session(url: str | URL):
- try:
- # 数据库引擎
- engine = create_async_engine(url, echo=settings.MYSQL_ECHO, future=True, pool_pre_ping=True)
- # log.success('数据库连接成功')
- except Exception as e:
- log.error('数据库链接失败 {}', e)
- sys.exit()
- else:
- db_session = async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
- return engine, db_session
- SQLALCHEMY_DATABASE_URL = (
- f'mysql+asyncmy://{settings.MYSQL_USER}:{quote(settings.MYSQL_PASSWORD)}@{settings.MYSQL_HOST}:'
- f'{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}?charset={settings.MYSQL_CHARSET}'
- )
- async_engine, async_db_session = create_engine_and_session(SQLALCHEMY_DATABASE_URL)
- async def get_db() -> AsyncSession:
- """session 生成器"""
- session = async_db_session()
- try:
- yield session
- except Exception as se:
- await session.rollback()
- raise se
- finally:
- await session.close()
- # Session Annotated
- CurrentSession = Annotated[AsyncSession, Depends(get_db)]
- async def create_table():
- """创建数据库表"""
- async with async_engine.begin() as coon:
- await coon.run_sync(MappedBase.metadata.create_all)
- def uuid4_str() -> str:
- """数据库引擎 UUID 类型兼容性解决方案"""
- return str(uuid4())
|