sqlalchemy 2.x 如何写异步 session + 事务上下文

异步 Session 必须用 AsyncSession 和 async_sessionmaker,不能复用同步 sessionmaker;事务需显式 await session.begin() 或用 async with session.begin(),expire_on_commit=False 防止提交后字段为 None。

异步 Session 必须用 AsyncSession,不能复用同步 sessionmaker

SQLAlchemy 2.x 的异步支持是彻底分离的:同步用 sessionmaker,异步必须用 async_sessionmaker。直接把同步工厂传给 async with 会报 RuntimeError: This event loop is already running 或静默失败。

正确写法是:

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy import text

engine = create_async_engine("sqlite+aiosqlite:///db.sqlite", echo=True) AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

✅ 正确:await 获取 session 实例

async def get_user(user_id: int): async with AsyncSessionLocal() as session: result = await session.execute(text("SELECT * FROM users WHERE id = :id"), {"id": user_id}) return result.fetchone()

  • expire_on_commit=False 很关键——否则 commit() 后对象字段变 None,尤其在返回 Pydantic 模型时直接崩
  • create_async_engine 必须用带 aiosqliteasyncpgaiomysql 的 URL,sqlite:/// 这种同步驱动不支持异步
  • 不要在 async with 外保存 session 引用,它不是线程/协程安全的

事务必须显式 await session.begin() 或用 async with session.begin()

异步 Session 不会自动开启事务。执行 session.execute()session.add() 时,若没处于事务中,SQLAlchemy 会临时开一个只读事务(对写操作无效),导致 commit 报错 InvalidRequestError: N

o transaction is in progress

两种可靠方式:

# 方式一:显式 begin + commit/rollback(适合复杂控制流)
async def transfer_money(from_id: int, to_id: int, amount: float):
    async with AsyncSessionLocal() as session:
        await session.begin()  # ⚠️ 必须 await
        try:
            await session.execute(
                text("UPDATE accounts SET balance = balance - :amt WHERE id = :id"),
                {"amt": amount, "id": from_id}
            )
            await session.execute(
                text("UPDATE accounts SET balance = balance + :amt WHERE id = :id"),
                {"amt": amount, "id": to_id}
            )
            await session.commit()
        except Exception:
            await session.rollback()
            raise

方式二:async with session.begin()(更简洁,自动 rollback on exception)

async def create_post(title: str, content: str): async with AsyncSessionLocal() as session: async with session.begin(): # ✅ 自动 commit,异常时自动 rollback session.add(Post(title=title, content=content))

  • session.begin() 是协程函数,必须 await;漏掉 await 会导致后续操作在无事务上下文中执行
  • async with session.begin() 内部已包含 await session.begin(),无需再手动调
  • session.rollback()session.commit() 也必须 await,否则事务挂起不提交

嵌套 async with 事务会报 InvalidRequestError: Transaction is already begun

SQLAlchemy 2.x 异步不支持真正的嵌套事务(savepoint 是另一回事)。如果外层用了 async with session.begin(),内层再 async with session.begin() 就会触发该错误。

需要“子事务”语义时,改用 session.begin_nested()

async def outer_logic():
    async with AsyncSessionLocal() as session:
        async with session.begin():
            await session.execute(text("INSERT INTO logs (msg) VALUES ('start')"))
        # 子事务:失败不影响外层
        nested = await session.begin_nested()  # ✅ 返回 SavepointTransaction
        try:
            await session.execute(text("INSERT INTO users (name) VALUES ('test')"))
            await nested.commit()  # 提交 savepoint
        except Exception:
            await nested.rollback()  # 回滚到 savepoint,外层仍可 commit

  • begin_nested() 返回的是 SavepointTransaction,不是新 AsyncSession,所有操作仍在同一 session 中
  • savepoint 在 PostgreSQL / MySQL 上有效,SQLite 的 aiosqlite 不支持(会静默退化为普通事务)
  • 别混淆 session.begin_nested()async_sessionmaker(begin_nested=True) —— 后者无效,参数只存在于同步版

依赖注入(如 FastAPI)中传 AsyncSession 要注意生命周期和 scope

FastAPI 的 Depends 默认每次请求新建依赖,但如果你手动 yield 一个未关闭的 AsyncSession,或在中间件里缓存 session 实例,容易引发 ResourceClosedError 或连接泄漏。

安全做法是让依赖函数本身负责 async withyield

from fastapi import Depends

async def get_db(): async with AsyncSessionLocal() as session: yield session # ✅ yield session,而非 yield AsyncSessionLocal()

@app.get("/users/{id}") async def read_user(id: int, session: AsyncSession = Depends(get_db)): result = await session.execute(select(User).where(User.id == id)) return result.scalar_one_or_none()

  • yield 前的 async with 确保 session 在请求结束时自动 close
  • 不要在依赖中 return session —— 那样 session 会在依赖返回后立即被 GC 关闭,后续使用报 ResourceClosedError
  • 若需跨多个 Depends 共享 session(比如 repo 层 + service 层),必须用同一个 get_db 实例,不能定义两个独立依赖

实际用的时候,最常踩的坑是忘了 await 所有 session 方法,以及误以为 async_sessionmaker 创建的是“自动事务 session”。事务边界永远要自己画清楚,async 版本不会替你猜。