如何实现 Celery 任务的自动重入队与异常 worker 容错

本文介绍如何通过 `acks_late=true` 和 `reject_on_worker_lost=true` 配合使用,使 celery 在 worker 异常终止(如被 sigkill 杀死)时,自动将未完成任务重新放回队列,避免任务丢失,无需依赖长时 `visibility_timeout`。

Celery 默认采用“预取确认”(ack on receipt)机制:任务一旦被 worker 拉取,即向 Broker 发送 ACK,即使 worker 后续崩溃,Broker 也认为该任务已成功处理,从而导致任务丢失。为解决这一问题,需启用延迟确认(late acknowledgment)worker 失联拒绝机制,二者协同可实现毫秒级故障感知与任务回滚。

✅ 核心配置说明

配置项 作用 推荐值
task_acks_late=True 延迟 ACK 至任务执行完成后发送(而非拉取时),确保失败/中断时任务仍保留在队列中 True
task_reject_on_worker_lost=True 当 worker 进程意外退出(如 SIGKILL、OOM Kill、崩溃)且任务尚未完成时,主动向 Broker 发送 REJECT 并设置 requeue=True,使任务立即重回队列头部 True
⚠️ 注意:reject_on_worker_lost=True 仅在 acks_late=True 生效时起作用;若未启用 acks_late,任务早已被 ACK,Broker 不再管理其生命周期,此参数无效。

? 使用方式(推荐粒度:任务级)

你可在单个任务装饰器中精准控制容错行为,避免全局配置影响其他任务:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379//')

@app.task(acks_late=True, reject_on_worker_lost=True)
def process_payment(order_id: str) -> dict:
    # 模拟可能被中断的长时间操作
    import time
    time.sleep(30)  # 若此时 worker 被 kill -9,任务将自动重入队
    return {"status": "completed", "order_id": order_id}

也可在应用级别统一启用(适用于所有任务):

app.conf.update(
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

? 补充说明与最佳实践

  • Broker 兼容性:该机制在 Redis 和 RabbitMQ 上均稳定支持;若使用 Redis,请确保版本 ≥ 5.0 且未禁用 client-output-buffer-limit 等关键配置。
  • 幂等性必须前置:因任务可能被重复执行,业务逻辑(如扣款、发信)务必设计为幂等,建议结合唯一任务 ID(task_id)或业务单号做去重校验。
  • 不替代健康监控:此方案解决的是「瞬时故障」下的任务兜底,不能替代对 worker 进程、资源、心跳的主动监控(如 Prometheus + Celery Exporter)。
  • 避免滥用 requeue=True:频繁重入队可能导致任务雪崩,建议配合 max_retries=3 和指数退避(countdown)提升鲁棒性:
@app.task(
    acks_late=True,
    reject_on_worker_lost=True,
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 3},
    default_retry_delay=60  # 首次重试延迟 60 秒
)
def fetch_external_data(url: str):
    ...

通过合理组合 acks_late 与 reject_on_worker_lost,你可以在不牺牲吞吐的前提下,显著提升 Celery 任务系统的可靠性与弹性——真正实现“worker 可死,任务不死”。