发布时间:2026/6/15 0:57:55
Redis 从入门到精通:Redis Stream —— 可靠消息队列
IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。前面我们学了 List 做队列、Pub/Sub 做广播但它们都有一个硬伤消息可靠性不足。List 弹出的消息就没了客户端崩溃则消息丢失Pub/Sub 干脆不持久化订阅者不在线时消息直接蒸发。对于订单处理、异步任务、日志收集这类不能丢消息的场景需要的是可靠消息队列。Redis 5.0 推出的Stream正是为此而生。它像 Kafka 一样支持消息持久化、消费者组、ACK 确认和消息回溯又保持了 Redis 的简洁与高性能。本文将带你从命令到 Python 实战用 Stream 构建一个真正生产可用的消息队列。1. Stream 是什么为什么要用Stream 是 Redis 追加日志型数据结构用于存储时间序列的消息。每条消息有一个全局唯一的 ID 和若干键值对。它的核心能力消息持久化消息写入 Stream 后不会因消费者离线而丢失。消费者组同组消费者竞争消费同一条消息实现负载均衡。ACK 确认消费者处理完后发送XACK消息从“待确认”变为“已确认”。消息回溯可以按 ID 重新消费历史消息不会像 List 一样弹出即销毁。阻塞读取XREAD可阻塞等待新消息避免空轮询。对比其他队列方案一句话总结要可靠上 Stream。2. 核心命令速览2.1 添加消息XADD127.0.0.1:6379XADD orders * action create user_id1001amount99.91680000000000-0orders是 Stream 的 key。*表示让 Redis 自动生成消息 ID格式毫秒时间戳-序号。后面跟着若干 field-value 对构成了消息体。返回自动生成的 ID。可以手动指定 ID但强烈建议用*自动生成保证单调递增。2.2 读取消息XREAD# 读取所有消息从头开始127.0.0.1:6379XREAD STREAMS orders0-01)1)orders2)1)1)1680000000000-02)1)action2)create3)user_id4)10015)amount6)99.9# 阻塞等待新消息类似 BRPOP127.0.0.1:6379XREAD BLOCK5000STREAMS orders $(nil)# 5秒内没有新消息返回空0-0表示从头读$表示只读最新类似 tail -f。BLOCK毫秒数0 表示永久阻塞。2.3 消费者组与 XREADGROUPStream 可以创建多个消费者组每组独立维护消费进度。同组内的消费者竞争消费各自处理完后 ACK。# 创建消费者组从头部开始消费127.0.0.1:6379XGROUP CREATE orders group10-0 OK# 消费者 A 读取 group1 未确认的消息 表示从未消费过的新消息127.0.0.1:6379XREADGROUP GROUP group1 consumerA COUNT1STREAMS orders1)1)orders2)1)1)1680000000000-02)1)action2)create...# 确认消息处理完毕127.0.0.1:6379XACK orders group11680000000000-0(integer)1只返回从未投递给任何消费者组内成员的新消息。XACK将消息标记为已处理从待确认列表移除。2.4 查看待处理消息XPENDING如果有消费者拿到消息后崩溃消息会一直处于待确认状态XPENDING可以查看这些消息。# 查看待处理消息概要127.0.0.1:6379XPENDING orders group11)(integer)0# 待确认消息数# 如果消费者 A 挂掉消息会显示在 pending 列表中127.0.0.1:6379XPENDING orders group1 - 10(列出具体的待处理消息及其空闲时间)2.5 消息转移XCLAIM当一个消费者长时间未 ACK可能已死可以由另一个消费者通过XCLAIM将消息“抢”过来处理。127.0.0.1:6379XCLAIM orders group1 consumerB600001680000000000-0把空闲超过 60000 毫秒的消息转交给consumerB。3. Python 实战订单处理系统我们用 Stream 构建一个订单处理流水线一个生产者发布订单多个消费者组成消费组并行处理订单处理失败的消息重试或转移。3.1 环境准备确保 Redis 版本 ≥ 5.0Docker 镜像redis:7.2满足。3.2 生产者发布订单importredisimporttimeimportjsonimportrandom rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)STREAM_KEYordersGROUP_NAMEorder_processors# 创建消费者组如果不存在try: r.xgroup_create(STREAM_KEY, GROUP_NAME,id0-0,mkstreamTrue)print(f消费者组 {GROUP_NAME} 已创建)except redis.exceptions.ResponseError as e:ifBUSYGROUPinstr(e): print(f消费者组 {GROUP_NAME} 已存在)else: raise def publish_order(order_id, user_id, amount):发布订单消息 msg{order_id:order_id,user_id:user_id,amount:amount,timestamp:time.time()}msg_idr.xadd(STREAM_KEY, msg)print(f[生产者] 发布订单 {order_id}: ID {msg_id})returnmsg_id# 模拟发布 10 个订单foriinrange(1,11): publish_order(fORD-{1000i}, random.randint(1,100), round(random.uniform(10,500),2))time.sleep(0.5)输出示例消费者组 order_processors 已存在[生产者]发布订单 ORD-1001: ID1680000001234-0[生产者]发布订单 ORD-1002: ID1680000001735-0...3.3 消费者处理订单并 ACK每个消费者从组中读取新消息模拟处理如扣减库存成功后 ACK。def process_order(msg_id, msg_data):模拟订单处理成功返回 True失败返回 False order_idmsg_data.get(order_id,unknown)amountfloat(msg_data.get(amount,0))print(f[消费者] 处理订单 {order_id} 金额 {amount})# 模拟处理随机成功或失败80% 成功successrandom.random()0.8ifsuccess: print(f[消费者] 订单 {order_id} 处理成功)else: print(f[消费者] 订单 {order_id} 处理失败)returnsuccess def start_consumer(consumer_name):启动一个消费者持续读取并处理消息 print(f[消费者 {consumer_name}] 启动)whileTrue: try:# 读取新消息每次最多 1 条阻塞 2 秒resultr.xreadgroup(GROUP_NAME, consumer_name,{STREAM_KEY:},count1,block2000)ifnot result:# 没有消息尝试处理本消费者的 pending 消息pendingr.xpending(STREAM_KEY, GROUP_NAME)ifpending[pending]0:# 读取自己的 pending 消息pending_msgsr.xpending_range(STREAM_KEY, GROUP_NAME,min-,max,count1,consumernameconsumer_name)ifpending_msgs:forpinpending_msgs: msg_idp[message_id]# 重新获取消息内容msgsr.xrange(STREAM_KEY,minmsg_id,maxmsg_id)ifmsgs: msg_datamsgs[0][1]print(f[消费者 {consumer_name}] 重试 pending 消息 {msg_id})ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)continuestream_name, messagesresult[0]formsg_id, msg_datainmessages: print(f[消费者 {consumer_name}] 收到消息 {msg_id})ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)else:# 处理失败不 ACK消息留在 pending 中# 后续可以由 XCLAIM 转移或手动重试pass except Exception as e: print(f[消费者 {consumer_name}] 异常: {e})time.sleep(1)# 启动消费者if__name____main__:importsys consumer_namesys.argv[1]iflen(sys.argv)1elseconsumer-1start_consumer(consumer_name)启动多个消费者终端python consumer.py consumer-Apython consumer.py consumer-B生产者发布消息后消费者输出示例[消费者 consumer-A]收到消息1680000001234-0[消费者]处理订单 ORD-1001 金额99.90[消费者]订单 ORD-1001 处理成功[消费者 consumer-B]收到消息1680000001735-0[消费者]处理订单 ORD-1002 金额250.00[消费者]订单 ORD-1002 处理失败注意消息不会重复消费A 和 B 竞争。3.4 处理失败消息死信队列与重试对于长时间 pending 的消息消费者崩溃或处理失败可以定时扫描用XCLAIM转移给健康消费者或超过最大重试次数后移入死信 Stream。def recover_pending(stream_key, group_name,idle_ms60000,max_retries3):恢复空闲消息将超时的 pending 消息转移给活跃消费者# 获取所有 pending 消息pendingr.xpending(stream_key, group_name)ifpending[pending]0:return# 获取空闲超过 idle_ms 的消息claimedr.xpending_range(stream_key, group_name,min-,max,count10)forpinclaimed: msg_idp[message_id]# 检查重试次数可存储在消息字段或额外 Redis keymsgsr.xrange(stream_key,minmsg_id,maxmsg_id)ifnot msgs:continuemsg_datamsgs[0][1]retry_countint(msg_data.get(retry,0))ifretry_countmax_retries:# 移入死信队列r.xadd(f{stream_key}:dead, msg_data)r.xack(stream_key, group_name, msg_id)r.xdel(stream_key, msg_id)print(f消息 {msg_id} 超过重试次数移入死信队列)elifp[time_since_delivered]idle_ms:# XCLAIM 转移给恢复消费者r.xclaim(stream_key, group_name,recovery_consumer, idle_ms, msg_id)print(f消息 {msg_id} 被 recovery_consumer 接管)# 定时任务调用whileTrue: recover_pending(orders,order_processors,idle_ms30000)time.sleep(10)3.5 异步消费者redis.asyncio在异步框架中Stream 同样适用。importasyncioimportredis.asyncio as aioredis async def async_consumer(consumer_name): rawait aioredis.from_url(redis://localhost,decode_responsesTrue)try: await r.xgroup_create(orders,async_group,id0-0,mkstreamTrue)except: passwhileTrue: resultawait r.xreadgroup(async_group, consumer_name,{orders:},count1,block2000)ifresult:formsg_id, msg_datainresult[0][1]: print(f[异步 {consumer_name}] 处理 {msg_id})await r.xack(orders,async_group, msg_id)await asyncio.sleep(0.1)asyncio.run(async_consumer(worker-1))4. Stream 高级特性消息裁剪XTRIM限制 Stream 长度避免无限膨胀。XADD ... MAXLEN ~ 1000近似裁剪。消息范围查询XRANGE/XREVRANGE按 ID 范围查询历史消息。消费组删除XGROUP DESTROY orders group1。监控XINFO STREAM orders查看 Stream 概览长度、最后 ID 等。5. 常见误区与最佳实践别忘了 ACK未 ACK 的消息会堆积在 pending 列表占内存且影响消费进度。合理设置 Stream 长度历史消息会一直保存用MAXLEN控制容量。消费者组名全局唯一不要把不同业务的消费组名重名。XREADGROUP 的 用法是读取新消息0-0是读取历史具体ID是读取未确认的。死信队列生产必须设计重试上限和死信转移避免无限重试阻塞队列。6. 动手试试搭建三消费者组一个 Stream 创建两个消费者组groupA和groupB每个组各有两个消费者验证同一条消息会被两个组独立消费组内竞争消费。模拟消费者崩溃消费者读到消息后不 ACK然后杀掉进程用XPENDING和XCLAIM将消息转移给另一个消费者。死信队列实现一个最多重试 2 次的处理逻辑超过后移入dead:ordersStream并定期巡检死信 Stream。性能测试生产者批量 XADD 10 万条消息观察消费者组吞吐量及XINFO STREAM长度变化。预期效果多组消费互不影响崩溃消息自动转移死信队列正确隔离失败消息批量处理稳定。7. 总结Redis Stream 把“可靠消息队列”集成到 Redis 内核中具备持久化、消费者组、ACK、消息回溯等专业 MQ 的核心特性且保持了 Redis 的简单与高性能。相比独立的 Kafka/RabbitMQ它更适合中轻量级异步任务、事件驱动架构且复用现有 Redis 基础设施大幅降低运维成本。掌握 Stream你就拥有了在 Redis 生态中构建可靠消息管道的能力。下一篇我们将进入性能调优用慢日志、基准测试、大 Key 优化等手段把 Redis 的性能彻底榨干。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维

相关新闻

九大网盘直链下载助手:告别客户端限制,解锁高效下载新姿势
2026/6/15 0:57:55

九大网盘直链下载助手:告别客户端限制,解锁高效下载新姿势

九大网盘直链下载助手:告别客户端限制,解锁高效下载新姿势 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国…

阅读更多
21.2 mcp-server-chart 图表化作用
2026/6/15 0:57:55

21.2 mcp-server-chart 图表化作用

如何检查 langchain_mcp_adapters 版本和 antv/mcp-server-chart 安装 1. 检查 langchain_mcp_adapters 版本 在终端(确保已激活虚拟环境)中运行: pip show langchain_mcp_adapters输出示例: Name: langchain-mcp-adapters Ve…

阅读更多
如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案
2026/6/15 0:57:55

如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案

如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your tri…

阅读更多
给技术人的实验室认证扫盲贴:CNAS、CMA、CAL到底有啥区别?看完这篇就懂了
2026/6/15 1:57:55

给技术人的实验室认证扫盲贴:CNAS、CMA、CAL到底有啥区别?看完这篇就懂了

给技术人的实验室认证扫盲贴:CNAS、CMA、CAL到底有啥区别?看完这篇就懂了作为技术研发或质量工程师,你是否曾在项目送检时被各种认证标志绕晕?当客户要求提供"带CNAS章的报告"或采购部门询问"CMA和CAL哪个更权威&q…

阅读更多
2026终极Android保活方案:如何实现应用永生与无权限自启动?
2026/6/15 1:57:55

2026终极Android保活方案:如何实现应用永生与无权限自启动?

2026终极Android保活方案:如何实现应用永生与无权限自启动? 【免费下载链接】AndroidKeepAlive Android 保活方案,进程永生, 无权限自启动, 安装自启动,禁止卸载,后台弹出页面,体外弹出,现已全面支持安卓16! 项目地址: https://…

阅读更多
SH9自指螺旋的曲率演化动力学:认知层级跃升的几何规律(世毫九实验室原创研究)
2026/6/15 1:57:55

SH9自指螺旋的曲率演化动力学:认知层级跃升的几何规律(世毫九实验室原创研究)

SH9自指螺旋的曲率演化动力学:认知层级跃升的几何规律(世毫九实验室原创研究) 作者:方见华 单位:世毫九实验室 本研究将自指迭代过程与自指螺旋拓扑的缠绕层级严格定量耦合,建立“概念→推理→高阶概念→高…

阅读更多
Diablo Edit2:免费开源的暗黑破坏神2存档编辑器终极指南
2026/6/15 1:57:55

Diablo Edit2:免费开源的暗黑破坏神2存档编辑器终极指南

Diablo Edit2:免费开源的暗黑破坏神2存档编辑器终极指南 【免费下载链接】diablo_edit Diablo II Character editor. 项目地址: https://gitcode.com/gh_mirrors/di/diablo_edit 你是否厌倦了在暗黑破坏神2中花费数小时刷装备却一无所获?是否想要…

阅读更多
别再被Cartographer的.lua配置文件坑了!手把手教你配置revo_lds.lua和demo_revo_lds.launch(附镭神LS-N10雷达实例)
2026/6/15 1:57:55

别再被Cartographer的.lua配置文件坑了!手把手教你配置revo_lds.lua和demo_revo_lds.launch(附镭神LS-N10雷达实例)

Cartographer深度配置实战:从参数解析到镭神LS-N10雷达适配指南当你在ROS环境下第一次成功运行Cartographer时,那种成就感确实令人振奋。但很快你会发现,默认配置往往无法直接适配你的硬件设备,尤其是当涉及到多传感器融合时。本文…

阅读更多
Redis 从入门到精通:Redis Stream —— 可靠消息队列
2026/6/15 0:57:55

Redis 从入门到精通:Redis Stream —— 可靠消息队列

IT策士 10余年一线大厂经验,专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章,助你少走弯路。 前面我们学了 List 做队列、Pub/Sub 做广播,但它们都有一个硬伤:消息可靠性不足。List 弹出的消息就没了,…

阅读更多
别再只用BERT了!用Transformers库的AutoModel,5分钟搞定文本相似度计算(附代码对比)
2026/6/14 0:57:30

别再只用BERT了!用Transformers库的AutoModel,5分钟搞定文本相似度计算(附代码对比)

超越BERT:用Transformers库高效实现文本相似度计算的三种实战方案在自然语言处理领域,文本相似度计算是信息检索、问答系统和推荐系统等应用的核心技术。传统方法如TF-IDF或Word2Vec已逐渐被基于Transformer的预训练模型所取代。Hugging Face的Transform…

阅读更多
Prompt Engineering:重构人机协作的工程化方法论
2026/6/14 0:57:30

Prompt Engineering:重构人机协作的工程化方法论

1. 项目概述:这不是“写提示词”,而是重构人机协作的底层逻辑“Prompt Engineering”这个词,这两年被讲得太多,也太轻飘。很多人把它理解成“给AI发指令的技巧”,甚至简化为“多加几个形容词”“换种说法再试一次”。我…

阅读更多
Anthropic提示层归零:模型即协议的工程实践
2026/6/14 0:57:30

Anthropic提示层归零:模型即协议的工程实践

1. 项目概述:这不是一次普通更新,而是一次架构级“蒸发”“Anthropic Just Shipped the Layer That’s Already Going to Zero”——这个标题一出来,我正在调试一个Claude调用链的终端前停了三秒。不是因为震惊,而是因为熟悉&…

阅读更多
TEKLauncher:终极ARK模组管理与性能优化解决方案
2026/6/15 0:57:55

TEKLauncher:终极ARK模组管理与性能优化解决方案

TEKLauncher:终极ARK模组管理与性能优化解决方案 【免费下载链接】TEKLauncher Launcher for ARK: Survival Evolved 项目地址: https://gitcode.com/gh_mirrors/te/TEKLauncher 你是否为ARK: Survival Evolved复杂的模组管理和服务器连接问题而烦恼&#xf…

阅读更多
如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案
2026/6/15 0:57:55

如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案

如何3分钟免费解锁Cursor Pro:终极AI编程助手破解方案 【免费下载链接】cursor-free-vip [Support 0.45](Multi Language 多语言)自动注册 Cursor Ai ,自动重置机器ID , 免费升级使用Pro 功能: Youve reached your tri…

阅读更多
21.2 mcp-server-chart 图表化作用
2026/6/15 0:57:55

21.2 mcp-server-chart 图表化作用

如何检查 langchain_mcp_adapters 版本和 antv/mcp-server-chart 安装 1. 检查 langchain_mcp_adapters 版本 在终端(确保已激活虚拟环境)中运行: pip show langchain_mcp_adapters输出示例: Name: langchain-mcp-adapters Ve…

阅读更多
GIT修改用户名
2026/6/14 11:53:59

GIT修改用户名

在GIT中修改用户名可按以下步骤操作: 查看当前git的用户名,使用命令git config --list或git config user.name。修改git用户名,使用命令git config --global user.name "xxx(新的用户名)",将其中…

阅读更多
Win11Debloat:让你的Windows系统重获新生的终极优化工具
2026/6/15 2:21:34

Win11Debloat:让你的Windows系统重获新生的终极优化工具

Win11Debloat:让你的Windows系统重获新生的终极优化工具 【免费下载链接】Win11Debloat A simple, lightweight PowerShell script that allows you to remove pre-installed apps, disable telemetry, as well as perform various other changes to declutter and …

阅读更多
技术深度解析:m4s-converter实现原理与B站缓存视频转换最佳实践
2026/6/14 15:49:58

技术深度解析:m4s-converter实现原理与B站缓存视频转换最佳实践

技术深度解析:m4s-converter实现原理与B站缓存视频转换最佳实践 【免费下载链接】m4s-converter 一个跨平台小工具,将bilibili缓存的m4s格式音视频文件合并成mp4 项目地址: https://gitcode.com/gh_mirrors/m4/m4s-converter m4s-converter是一个…

阅读更多