Chronicle Queue:把 Disruptor 的数据落盘

张开发
2026/4/22 17:22:18 15 分钟阅读
Chronicle Queue:把 Disruptor 的数据落盘
之前聊过 Disruptor高性能队列 Disruptor它的性能逆天但有个致命问题纯内存进程挂了数据就丢了。Chronicle Queue 就是来解决这个问题的——持久化的 Disruptor。解决什么问题Disruptor 的问题 - 进程崩溃 → 数据全丢 - 重启后无法恢复之前的消息 - 只能作为临时缓冲 Chronicle Queue 的改进 - 数据落盘进程重启不丢失 - 支持消息重放Replay - 保留了 Disruptor 的高性能同样的团队LMAX同样的设计理念只是一个加了持久化。核心原理MMAP 内存映射Chronicle Queue 之所以快关键在于MMAP内存映射// 传统文件写入write(fd,data,size);// 用户态 → 内核态 → 磁盘// MMAP 写入memcpy(buffer,data,size);// 直接写内存由 OS 异步刷盘MMAP 把磁盘文件映射到内存写入像写内存一样快操作系统会自己在后台刷盘。文件结构chronicle-queue/ ├── 20190101.data # cycle 文件约 64MB ├── 20190102.data ├── 20190103.data └── 20190104.data 每个文件叫做一个 cycle写满自动切换下一个数据写入流程// 1. 写入数据到 MMAP 内存appender.writeText(订单创建: orderId1001);// 2. 数据立即可读内存读取Stringmsgtailer.readText();// 3. OS 在后台异步刷盘不需要等待快速开始引入依赖dependencygroupIdnet.openhft/groupIdartifactIdchronicle_queue/artifactIdversion5.22.4/version/dependency基础用法publicclassChronicleQueueDemo{publicstaticvoidmain(String[]args){Stringpath./data/orders;// 1. 创建队列单机模式ChronicleQueuequeueChronicleQueueBuilder.single(path)// 单文件模式.build();// 2. 获取写入器ExcerptAppenderappenderqueue.acquireAppender();// 3. 写入消息支持多种格式appender.writeText(订单1: 100元);appender.writeText(订单2: 200元);appender.writeText(订单3: 300元);// 4. 获取读取器ExcerptTailertailerqueue.createTailer();// 5. 读取所有消息while(true){Stringtexttailer.readText();if(textnull){break;// 没有更多消息}System.out.println(收到: text);}queue.close();}}输出收到: 订单1: 100元 收到: 订单2: 200元 收到: 订单3: 300元写入/读取二进制数据Text 方便但不类型安全实际项目推荐用二进制// 1. 定义 Order 消息DatapublicclassOrder{privatelongorderId;privatedoubleamount;privateStringstatus;}// 2. 使用 BinaryWire 写入ChronicleQueuequeueChronicleQueueBuilder.single(./data/orders).wireType(WireType.BINARY).build();ExcerptAppenderappenderqueue.acquireAppender();// 写入appender.writeDocument(w-w.write(orderId).int64(1001).write(amount).float64(299.00).write(status).text(pending));// 读取ExcerptTailertailerqueue.createTailer();while(tailer.readDocument(r-{longorderIdr.read(orderId).int64();doubleamountr.read(amount).float64();Stringstatusr.read(status).text();System.out.println(orderId, amount, status);})){// 继续读取}服务重启后数据恢复这是 Chronicle Queue 最厉害的地方——重启后消息还在publicclassDataRecoveryDemo{publicstaticvoidmain(String[]args){Stringpath./data/orders;// 第一次运行写入数据writeData(path);// 模拟进程重启...// 第二次运行读取数据刚才写的还在readData(path);}privatestaticvoidwriteData(Stringpath){ChronicleQueuequeueChronicleQueueBuilder.single(path).build();ExcerptAppenderappenderqueue.acquireAppender();for(inti0;i100;i){appender.writeText(订单 i);}queue.close();System.out.println(写入完成进程退出);}privatestaticvoidreadData(Stringpath){ChronicleQueuequeueChronicleQueueBuilder.single(path).build();ExcerptTailertailerqueue.createTailer();intcount0;while(true){Stringtexttailer.readText();if(textnull)break;count;}System.out.println(重启后读到 count 条消息);queue.close();}}输出写入完成进程退出 重启后读到 100 条消息异步写入Chronicle Queue 默认是同步写入内存但不刷盘如果需要更高吞吐// 异步写入不等待appender.writeText(订单,()-异步订单数据);// 或者批量写入for(inti0;i10000;i){appender.writeText(订单 i);// 先写入到内存积累一定量后批量刷盘}指定读取位置有时候不需要从头读只想从某个时间点开始// 从指定时间开始读取longfromTimeSystem.currentTimeMillis()-3600_000;// 1小时前ExcerptTailertailerqueue.createTailer().toStart()// 从头开始// 或// .toEnd() // 只读新消息// .to(1234567890L) // 读到指定位置;// 遍历while(true){Stringtexttailer.readText();if(textnull)break;// 处理...}为什么不都用 Chronicle Queue说了这么多好处为什么大家还是在用 KafkaChronicle Queue 的局限不能分布式 - 只能单机玩想扩容没门不支持多消费者 - 一条消息只能被一个进程拿走数据堆积有限 - 单机磁盘多大它就能存多少没有生态 - 没有管理后台、没有监控面板说白了Chronicle Queue 就像一辆超跑赛道无敌但上不了高速不能分布式。Kafka 就像 SUV什么都能干虽然跑赛道不如超跑但胜在能拉人能越野支持集群、广播、海量数据。选谁同机器进程间通信、高性能低延迟 → Chronicle Queue跨机器、分布式、海量数据 → Kafka/RabbitMQ两者不是一个赛道的没有谁取代谁的说法。注意事项1. 磁盘空间数据不会自动清理需要定期清理旧 cycle 文件# 手动清理 3 天前的文件find./data/orders-name*.data-mtime3-delete# 或配置 Chronicle Queue 自动清理ChronicleQueueBuilder.single(path).rollCycle(RollCycles.MINUTELY)// 按分钟轮转 .build();2. 数据安全如果需要更严格的数据安全同步刷盘ChronicleQueuequeueChronicleQueueBuilder.single(path).forceWrites(true)// 每次写入都同步刷盘会变慢.build();3. 读取的唯一性多个进程读同一个 Queue每条消息可以被每个进程读到一次不是广播模式。如果需要广播用 Chronicle Broadcast。总结Chronicle Queue 就是一个能落盘的 DisruptorMMAP让写入像内存一样快cycle 文件实现数据持久化和自动轮转支持重放让故障恢复变得简单适合场景日志持久化、事件溯源、低延迟进程间通信。如果你的业务需要高性能又不希望丢数据Chronicle Queue 是一个很好的选择

更多文章