fastapi 如何实现大文件分片上传 + 断点续传支持

FastAPI可通过HTTP协议特性与后端状态管理实现分片上传和断点续传:1.客户端按固定大小切片并携带file_id、chunk_index等元数据上传;2.FastAPI接口接收分片并用Redis记录进度;3.通过/status接口查询已传分片以支持断点续传;4.所有分片完成后异步合并并校验清理。

FastAPI 本身不直接提供分片上传和断点续传能力,但可以通过标准 HTTP 协议特性(如 Content-RangeETag、自定义请求头) + 后端状态管理(如 Redis 或数据库记录上传进度)来完整实现。核心在于:客户端按固定大小切片、逐片上传、服务端

校验并合并,同时记录每片是否已接收。

1. 客户端分片与上传协议设计

前端需将大文件切分为固定大小的 Blob(如 5MB/片),并为每次请求携带以下信息:

  • file_id:全局唯一标识该次上传(如 UUID),用于关联所有分片
  • chunk_index:当前分片序号(从 0 开始)
  • total_chunks:总分片数
  • filename:原始文件名(可选,用于最终合并时命名)
  • Content-Range:标准格式 bytes 0-5242879/10485760,便于服务端定位偏移

推荐使用 fetchaxios 发送 PUT 请求(比 POST 更符合幂等性语义),并设置 headers: { 'X-File-ID': 'xxx', 'X-Chunk-Index': '0' }

2. FastAPI 接口:接收单个分片

UploadFile 接收二进制流,结合 Redis 记录上传状态(推荐使用 aioredis):

示例代码(简化版):

from fastapi import FastAPI, UploadFile, File, Header, HTTPException
from redis import asyncio as aioredis
import os

app = FastAPI() redis = aioredis.from_url("redis://localhost")

@app.put("/upload/chunk") async def upload_chunk( file: UploadFile = File(...), x_file_id: str = Header(...), x_chunk_index: int = Header(...), x_total_chunks: int = Header(...), content_range: str = Header(...) ):

解析 Content-Range 获取 offset 和 total_size(可选)

if not content_range.startswith("bytes "):
    raise HTTPException(400, "Missing or invalid Content-Range")

# 存储分片到临时目录(如 /tmp/uploads/{file_id}/{index})
chunk_dir = f"/tmp/uploads/{x_file_id}"
os.makedirs(chunk_dir, exist_ok=True)
chunk_path = os.path.join(chunk_dir, str(x_chunk_index))

with open(chunk_path, "wb") as f:
    while chunk := await file.read(8192):
        f.write(chunk)

# 记录该分片已接收(Redis Set 或 Hash)
await redis.sadd(f"upload:{x_file_id}:chunks", str(x_chunk_index))
await redis.hset(f"upload:{x_file_id}:meta", mapping={
    "filename": file.filename,
    "total_chunks": str(x_total_chunks),
    "size": str(file.size)
})

return {"status": "ok", "chunk": x_chunk_index}

3. 断点续传支持:查询已上传分片

客户端在开始上传前,先调用接口获取已成功上传的分片列表,跳过重传:

@app.get("/upload/status")
async def get_upload_status(x_file_id: str = Header(...)):
    # 查询 Redis 中已接收的分片索引
    chunks = await redis.smembers(f"upload:{x_file_id}:chunks")
    meta = await redis.hgetall(f"upload:{x_file_id}:meta")
return {
    "uploaded_chunks": sorted([int(c) for c in chunks]),
    "total_chunks": int(meta.get(b"total_chunks", b"0")),
    "filename": meta.get(b"filename", b"").decode()
}

客户端根据返回的 uploaded_chunks 列表,只上传缺失的索引分片即可实现断点续传。

4. 合并分片 & 清理临时文件

当所有分片接收完成(len(uploaded_chunks) == total_chunks),触发合并逻辑:

  • chunk_index 顺序读取所有临时文件,拼接写入目标路径
  • 校验最终文件 MD5(可选,防止传输损坏)
  • 删除 Redis 中的上传元数据和临时分片目录
  • 返回最终文件 URL 或 ID

合并操作建议异步执行(如用 BackgroundTasks 或 Celery),避免阻塞主请求。注意加锁(如 Redis Lock)防止并发合并同一文件。