别院牧志知识库 别院牧志知识库
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • Python 面试题
  • 2025 面试记录
  • 2022 面试记录
  • 2021 面试记录
  • 2020 面试记录
  • 2019 面试记录
  • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
首页
  • 基础

    • 全栈之路
    • 😎Awesome资源
  • 进阶

    • Python 工匠系列
    • 高阶知识点
  • 指南教程

    • Socket 编程
    • 异步编程
    • PEP 系列
  • Python 面试题
  • 2025 面试记录
  • 2022 面试记录
  • 2021 面试记录
  • 2020 面试记录
  • 2019 面试记录
  • 数据库索引原理
  • 基金

    • 基金知识
    • 基金经理
  • 细读经典

    • 德隆-三个知道
    • 孔曼子-摊大饼理论
    • 配置者说-躺赢之路
    • 资水-建立自己的投资体系
    • 反脆弱
  • Git 参考手册
  • 提问的智慧
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 工作
  • 规范

  • Linux

  • 数据库

  • Git

  • 👨‍💻Web

  • 英语

  • Docker

  • 编辑器

  • 网络

  • 前端

  • 存储

  • 备忘录

    • 面试准备
    • BZSY 工作记录
    • JJY 开发记录文档
    • bzsy 记录白皮书
    • 订单超时取消场景详解:基于死信队列的优雅实现
      • 业务场景核心需求
        • 关键业务规则
      • 基于死信队列的实现方案
        • 系统架构图
        • 核心组件说明
      • 详细实现步骤
        • 步骤 1:订单创建时发送延迟消息
        • 步骤 2:消息过期成为死信
        • 步骤 3:死信队列消费与订单取消
      • 关键技术优化点
        • 1. 时间精度保障
        • 2. 幂等性设计
        • 3. 失败重试机制
        • 4. 性能优化
      • 与传统方案对比
        • 1. 数据库轮询方案
        • 2. 定时任务方案
        • 3. Redis 键过期方案
      • 生产环境监控指标
        • 报警规则示例
      • 方案优势总结
    • 支付系统策略模式实现
  • 如何开始你的单元测试
  • 以程序员的视角看中国——西安篇
  • 💻工作
  • 备忘录
佚名
2023-03-24
目录

订单超时取消场景详解:基于死信队列的优雅实现

# 业务场景核心需求

# 关键业务规则

  1. 时效性:30 分钟精确控制(±1 分钟内)

  2. 可靠性:即使系统重启,计时不丢失

  3. 可扩展:支持每秒数千订单

  4. 事务性:取消订单需原子操作(回库存+更新状态)

# 基于死信队列的实现方案

# 系统架构图

# 核心组件说明

组件 角色 技术选择
订单服务 业务入口,创建订单和消息 Django
RabbitMQ 延迟消息存储和死信路由 RabbitMQ + 死信交换器
取消服务 执行取消操作的消费者 Python Celery Worker
订单数据库 持久化订单状态 MySQL InnoDB
库存服务 库存扣减与回滚接口 gRPC 微服务

# 详细实现步骤

# 步骤 1:订单创建时发送延迟消息


# 订单服务 (order_service.py)
import pika
import json

def create_order(order_data):
    # 1. 数据库创建订单
    order_id = db.insert_order(order_data)
    
    # 2. 发送延迟消息到RabbitMQ
    connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
    channel = connection.channel()
    
    # 声明带死信参数的订单队列
    args = {
        'x-dead-letter-exchange': 'order_dlx',  # 死信交换器
        'x-dead-letter-routing-key': 'order.cancel',  # 死信路由键
        'x-message-ttl': 30 * 60 * 1000  # 30分钟TTL(毫秒)
    }
    channel.queue_declare(queue='order_queue', arguments=args)
    
    # 发布消息
    message = {
        'order_id': order_id,
        'created_at': datetime.now().isoformat()
    }
    channel.basic_publish(
        exchange='',
        routing_key='order_queue',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 持久化消息
            headers={'retry_count': 0}  # 初始化重试计数
        )
    )
    connection.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 步骤 2:消息过期成为死信

# 步骤 3:死信队列消费与订单取消

# 取消服务 (cancel_service.py)
import pika
import json
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def consume_dlq():
    connection = pika.BlockingConnection(pika.ConnectionParameters('mq_host'))
    channel = connection.channel()
    
    # 消费死信队列
    def callback(ch, method, properties, body):
        try:
            message = json.loads(body)
            cancel_order(message['order_id'])  # 执行取消
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            handle_failure(message, e)  # 异常处理
            
    channel.basic_consume(queue='dead_letter_queue', on_message_callback=callback)
    channel.start_consuming()

def cancel_order(order_id):
    # 1. 开启数据库事务
    with db.transaction():
        # 2. 检查订单状态(防止重复取消)
        order = db.get_order(order_id)
        if order.status != 'UNPAID':
            return  # 已处理
        
        # 3. 更新订单状态
        db.update_order_status(order_id, 'CANCELLED')
        
        # 4. 释放库存 (gRPC调用)
        inventory_client.release_stock(
            sku=order.sku, 
            quantity=order.quantity
        )
        
        # 5. 记录取消日志
        log_cancellation(order_id)
    
    # 6. 通知用户(异步)
    notify_user.delay(order.user_id, '订单已自动取消')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 关键技术优化点

# 1. 时间精度保障

问题 解决方案 误差控制
RabbitMQ TTL 精度不足 应用层补时 ±5 秒 → ±0.5 秒
服务器时间不同步 使用 NTP 时间同步 ±10 毫秒
消息堆积导致延迟 独立队列+优先级 99%消息准时处理

补时代码示例:

# 在消费者端计算实际延迟
message_time = datetime.fromisoformat(properties.headers['created_at'])
actual_delay = (datetime.now() - message_time).total_seconds()
if actual_delay < 1790:  # 29分50秒
    requeue_with_new_ttl(1790 - actual_delay)  # 重新入队
1
2
3
4
5

# 2. 幂等性设计

# 3. 失败重试机制

def handle_failure(message, exception):
    retry_count = message.properties.headers.get('retry_count', 0)
    if retry_count < 3:
        # 指数退避重试
        delay = 2 ** retry_count * 60  # 1,2,4分钟
        requeue_with_delay(message, delay, retry_count+1)
    else:
        # 人工干预
        alert_ops_team(message, exception)
        archive_message(message)
1
2
3
4
5
6
7
8
9
10

# 4. 性能优化

策略 实现方式 提升效果
批量确认 每 100 条消息 ack 一次 吞吐量↑300%
连接池复用 使用 Celery 连接池 延迟↓40%
并行消费 启动多个 Celery Worker 处理能力线性扩展

# 与传统方案对比

# 1. 数据库轮询方案

-- 每5秒扫描一次
SELECT * FROM orders 
WHERE status = 'UNPAID' 
AND created_at < NOW() - INTERVAL 30 MINUTE;
1
2
3
4

缺点:

  • 高数据库压力(全表扫描)

  • 时间精度差(最大 5 秒延迟)

  • 扩展困难

# 2. 定时任务方案

# 每分钟执行
@scheduler.scheduled_job('interval', minutes=1)
def check_orders():
    find_and_cancel_expired_orders()
1
2
3
4

缺点:

  • 峰值压力(每分钟集中处理)

  • 长事务风险

  • 分布式协调复杂

# 3. Redis 键过期方案

# 设置30分钟过期
redis.setex(f"order:{order_id}", 1800, "pending")
1
2

缺点:

  • 可靠性不足(Redis 持久化间隙可能丢数据)

  • 无状态跟踪

  • 无法携带复杂数据

# 生产环境监控指标

# 报警规则示例

指标 阈值 报警渠道
死信积压量 > 1000 短信+钉钉
取消操作平均延迟 > 3 秒 企业微信
取消失败率 > 1% 邮件+电话
库存释放失败次数 > 5/分钟 钉钉群

# 方案优势总结

  1. 精准时效

    • 通过 TTL+时间补偿实现秒级精度
  2. 可靠保障

    • 消息持久化+死信路由确保不丢单
  3. 资源高效

    • 无轮询开销,CPU 利用率降低 70%
  4. 弹性扩展

    • 独立服务可水平扩展
  5. 故障隔离

    • 订单服务与取消服务解耦

最佳适用场景:

  • 电商订单、票务预约、拍卖竞价等时效敏感业务

  • 需要高可靠定时触发的分布式系统

不适用场景:

  • 需要秒级以下精度的实时交易
  • 无持久化需求的临时任务

ref:

  • 订单支付超时未支付关闭订单的解决方案 - 掘金 (opens new window)
  • 领导:谁再用 redis 过期监听实现关闭订单,立马滚蛋! - 掘金 (opens new window)
  • 订单超时怎么处理?我们用这种方案 (opens new window)

勘误:Redisson 延时队列原理详解 - 知乎 (opens new window)

编辑 (opens new window)
#私人向
上次更新: 2025-06-04, 07:40:06
bzsy 记录白皮书
支付系统策略模式实现

← bzsy 记录白皮书 支付系统策略模式实现→

最近更新
01
本事与投资
06-20
02
Flask 运行周期及工作原理
06-05
03
支付系统策略模式实现
06-04
更多文章>
Theme by Vdoing | Copyright © 2019-2025 IMOYAO | 别院牧志
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式