db_mysql.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import sys
  2. from typing import Annotated
  3. from urllib.parse import urlparse, quote
  4. from uuid import uuid4
  5. from fastapi import Depends
  6. from sqlalchemy import URL
  7. from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
  8. from common.log import log
  9. from common.model import MappedBase
  10. from core.conf import settings
  11. def create_engine_and_session(url: str | URL):
  12. try:
  13. # 数据库引擎
  14. engine = create_async_engine(url, echo=settings.MYSQL_ECHO, future=True, pool_pre_ping=True)
  15. # log.success('数据库连接成功')
  16. except Exception as e:
  17. log.error('数据库链接失败 {}', e)
  18. sys.exit()
  19. else:
  20. db_session = async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False)
  21. return engine, db_session
  22. SQLALCHEMY_DATABASE_URL = (
  23. f'mysql+asyncmy://{settings.MYSQL_USER}:{quote(settings.MYSQL_PASSWORD)}@{settings.MYSQL_HOST}:'
  24. f'{settings.MYSQL_PORT}/{settings.MYSQL_DATABASE}?charset={settings.MYSQL_CHARSET}'
  25. )
  26. async_engine, async_db_session = create_engine_and_session(SQLALCHEMY_DATABASE_URL)
  27. async def get_db() -> AsyncSession:
  28. """session 生成器"""
  29. session = async_db_session()
  30. try:
  31. yield session
  32. except Exception as se:
  33. await session.rollback()
  34. raise se
  35. finally:
  36. await session.close()
  37. # Session Annotated
  38. CurrentSession = Annotated[AsyncSession, Depends(get_db)]
  39. async def create_table():
  40. """创建数据库表"""
  41. async with async_engine.begin() as coon:
  42. await coon.run_sync(MappedBase.metadata.create_all)
  43. def uuid4_str() -> str:
  44. """数据库引擎 UUID 类型兼容性解决方案"""
  45. return str(uuid4())