RocketMQ源码分析之消息写入.docx

上传人:lao****ou 文档编号:406505 上传时间:2023-10-25 格式:DOCX 页数:42 大小:255.87KB
下载 相关 举报
RocketMQ源码分析之消息写入.docx_第1页
第1页 / 共42页
RocketMQ源码分析之消息写入.docx_第2页
第2页 / 共42页
RocketMQ源码分析之消息写入.docx_第3页
第3页 / 共42页
RocketMQ源码分析之消息写入.docx_第4页
第4页 / 共42页
RocketMQ源码分析之消息写入.docx_第5页
第5页 / 共42页
亲,该文档总共42页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《RocketMQ源码分析之消息写入.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之消息写入.docx(42页珍藏版)》请在第一文库网上搜索。

1、RocketMQ源码分析之消息写入ROCketMQ是阿里巴巴开源的分布式消息中间件,它具有低延迟、高性能、高可靠性、万亿级容量和灵活的扩展性。本篇文章介绍了其存储文件和存储整体架构,并从源码角度分析了消息写入流程以及消息刷盘。1. RocketMQ存储文件Rocketmq存储路径为$ROCKETIoME/store,主要存储以下文件: Commit1og消息存储目录 Consumequeue消息消费队列存储目录 index消息索引文件存储目录checkpoint文件检查点,存储COmmit1og、ConSUmeqUeUe和index文件最后一次刷盘时间戳 abort如果abort文件存储则表示

2、broker非正常关闭,否则表示broker正常关闭。该文件是在broker启动的过程中创建的。 configbroker运行期间一些配置信息,主要包含以下信息: ConsumerFi1ter.json该文件保存的是每个topic中消息的过滤逻辑 ConsumerOffset.json该文件保存的是每个COnSUnIergroup的消费进度 de1ayffset.json该文件保存的是延迟消息队列拉取进展 SubscriptionGroup.json该文件保存的是每个消费者的订阅信息 topics,json该文件保存的是topic的配置信息2.RocketMQ消息存储整体架构消息存储架构图中有

3、三个与消息存储相关的文件,分别是COiTimit1og.Consumequeue和index0RocketMQ通过使用内存映射文件来提高IO访问性能,无论是COmmit1og、COnSUnIeqUeUe还是index,单个文件都被设计为固定长度,如果一个文件写满以后再创建一个新文件。COnImitIog和COnSUnIeqUeUe的文件名称是该文件第一条消息对应的全局物理偏移量,index的文件名称是以创建文件的时间戳命名。在ROCketMQ中所有topic的消息都存储在同一个文件中,这样就确保了发送时顺序写文件及消息发送的高性能和高吞吐量。但是RocketMQ是基于topic的消息订阅机制,

4、这样便给消息消费以及消息检索带来了极大的不便。为了提高COnSUmer消费消息的效率,RoCketMQ引入了Consumequeue,ConSUmeqUeUe文件组织方式是$R0CKETJI0ME/store/consumequeue/topic名称/queueid/,它记录的是消息的COmmitIogOffset、消息大小和taghashcode为了提高消息检索的功能,ROCketMQ中引入了index文件,其hash冲突设计理念借鉴了JaVa中HaShMaP的结构。index文件包含三个部分:IndeXHeader、HaSh槽和IndeX条目,其中IndeXHeader记录了index中包

5、含消息的最大及最小存储时间、最大及最小物理偏移量、hashS1ot个数、index条目列表当前已使用的个数,IndeX条目记录的是消息key的hashcode、消息的COmmit1ogoffset、消息与第一条消息的时间戳差值及该条目的前一条目的index索引。(注意:根据keyhashcode定位hash槽可能会引发hash冲突,index文件为了解决hash冲突其解决方法是每个hash槽存储的是落在这个槽的hashcode最新的index的索引,新的index条目的最后四个字节存储该槽上一个条目的index的下标。)消息存储架构图可以简化为以下流程: producer发送消息到broker

6、 broker采用同步或者异步方式将消息刷盘持久化 broker的master和SIaVe之间数据同步 broker后台服务线程ReputMessageService分发请求构建Consumequeue和index文件本篇文章我们一起先来看下消息的写入流程。3 .MappedFi1e与MappedFi1eQueue在RocketMQ中使用MappedFi1e和MappedFi1eQueue来封装存储文件,MappedFi1e是RocketMQ内存映射文件的具体实现,MappedFi1eQueue是MaPPedFi1e的管理容器,MaPPedFiIeQUeUe是对存储目录的封装。下图可以表示出两

7、者的关系:MappedFi1e重要属性如下所示:MappedFi1eQueue重要属性如下所示:4 .消息写入4.1 消息写入流程消息写入的整体流程如下图所示:Commi11OgftputMessage流程如下图所示:下面我们详细分析写入流程中几个比较重要的方法: get1astMappedFiIe(fina11ongStartOffset,boo1eanneedCreate) appendMessagesInner(fina1MessageExtmessageExt,fina1AppendMessageCa1IbackCb)及CIOAPPenC1(fina11ongfiIeFromOffse

8、t,fina1ByteBufferbyteBuffer,fina1intmaxB1ank,fina1MessageExtBrokerInnermsg1nner)4.2获取最新的mappedFi1e获取最新的mappedFiIe的方法是get1astMappedFiIe(fina11ongStartOffset,boo1eanneedCreate),其实现逻辑如下: 在MaPPedFi1eQUeUe中使用CopyOnWriteArray1istHiappedFi1es记录了mappedFi1e的集合,在写入数据时我们总是在最新的mappedFi1e中写入数据,所以首先从HiappedFi1es中

9、获取最后一个HiappedFi1e 最新的mappedFi1e为空,这种情况下计算待创建的HiappedFi1e的起始Offset0mappedFi1e为空的场景是第一次使用broker 最新的mappedFi1e不为空并且已经写满了,这样情况下也需要计算待创建的mappedFi1e的起始offset,计算方法是最新mappedFi1e的初始偏移量与每个mappedFi1e大小的和 如果待创建的mappedFi1e的offset不为T并且needCreate为true,构建出待创建的mappedFi1e的文件路径nextFiIePath以及再下一个mappedFiIe的文件路径nextNext

10、FiIePath,然后调用a1IocateMappedFiIeService服务的PutRequestAndReturnMappedFi1e方法构建AIIOCateReqUeSt(该请求实现了COnIPareTO方法,请求是按照文件名称从小到大排序的,即创建ITIaPPedFiIe是有序的)请求并将请求放在其待处理的队列中,后台a1IocateMappedFiIeService服务会从请求队列中获取请求并创建mappedFi1e。创建InaPPedFiIe的方法是a1IocateMappedFiIeService服务中的mmapOperation(),这里面需要注意:创建mappedFiIe有

11、两种不同的方式。方式一:mappedFiIe-Service1oader.1oad(MappedFi1e.c1ass),iteratorO.next();mappedFi1e.init(req.getFiIePath(),req.getFiIeSizeO,messageStore.getTransientStorePoo1();这种方式是在broker的配置文件中刷盘方式是异步刷盘并且TransientstorePoo1Enab1e为true的情况下生效,该方式下MappedFi1e会将向TranSientStorePoO1申请的堆外内存(DirectByteBuffer)空间作为WriteB

12、uffer,写入消息时先将消息写入WriteBuffer,然后将消息提交至fI1eChanne1最后再f1usho方式二:mappedFiIe-newMappedFiIe(req.getFiIePathO,req.getFi1eSize();这种方式是直接创建MappedFi1e内存映射文件字节缓冲区mappedByteBuffer,将消息写入mappedByteBuffer再fIusho如果最新的mappedFiIe不为空则直接返回该mappedFiIe即可get1astMappedFiIe(fina11ongStartOffset,boo1eanneedCreate)的实现如下:4.3追加

13、消息到mappedFi1e追加消息到mappedFi1e的实现方法是HppendMessagesInner(fina1MessageExtmessageExt,fina1AppendMessageCa1Ibackcb),其实现逻辑如下:获取mappedFiIe写指针位置判断写指针的位置与文件大小的关系,如果写指针的位置小于文件大小则按照消息的类型(普通消息及批量消息)调用AppendMessageCa1Iback的回调函数doppend追加消息,doAppend方法是追加消息的核心实现,其实现逻辑是:- 计算消息写入的位置- 为消息创建msg1d,其创建规则是4个字节IP+4个字节的端口号+8

14、字节的消息偏移量- 在commit1og的topicQueueTab1e记录Consumequeue的信息- 序列化消息(注意:ProdUCer发送的消息格式和broker最终存储的消息格式是不一样的),broker端存储的消息的格式如下:字字字段段段含大义小T消0息T条A目41总S长义小ZE度数M用A来G判I断C消4C息O是D正E常消息义小是空消息消B息O体DCYR4CCR校C验码Q消U息4E消字段段段含大义小E队I列Did消F息1f4A1GagQ消U息E在U消8E息O消义小F队S列E的T偏移*P消H息Y在ScIOCm8Am1iOtF1Fo字段段段含大义小SgE中T的偏移量消息S系Y统SfF141aAgG,例字段段段含大义小是否压缩、是否是事务消息B消O息R生8N产义小I调M用E消S息T发A送MAPPI的时间戳B消O息R发义小H者O的SITP和端口号ST消O息R存E8储T时I间M义小STAMPSbTrOoRkEeHr8O的SITPA和D义小R口E号SSRECO消N息S重4U试M次E数TI义小

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 应用文档 > 汇报材料

copyright@ 2008-2022 001doc.com网站版权所有   

经营许可证编号:宁ICP备2022001085号

本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有,必要时第一文库网拥有上传用户文档的转载和下载权。第一文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知第一文库网,我们立即给予删除!



客服