RocketMQ源码分析之主从数据复制.docx

上传人:lao****ou 文档编号:406540 上传时间:2023-10-25 格式:DOCX 页数:47 大小:312.23KB
下载 相关 举报
RocketMQ源码分析之主从数据复制.docx_第1页
第1页 / 共47页
RocketMQ源码分析之主从数据复制.docx_第2页
第2页 / 共47页
RocketMQ源码分析之主从数据复制.docx_第3页
第3页 / 共47页
RocketMQ源码分析之主从数据复制.docx_第4页
第4页 / 共47页
RocketMQ源码分析之主从数据复制.docx_第5页
第5页 / 共47页
亲,该文档总共47页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《RocketMQ源码分析之主从数据复制.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之主从数据复制.docx(47页珍藏版)》请在第一文库网上搜索。

1、RocketMQ源码分析之主从数据复制前言在RocketMQ主从架构中master和s1ave之间会进行数据同步,其中数据同步包括元数据复制和coit1og复制,那么为什么同步的数据中不包括COnSUmeqUeUe和indexFiIe呢?这里大家可以思考下:master节点上Consumequeue和indexFi1e是根据commit1og构建的,所以s1ave在同步完COmmit1og后只需要根据COmmit1Og构建COi1SUmeqUeUe和indexFi1e即可。本篇文章就来分析下master和s1ave之间是如何进行数据同步?一、元数据复制1 .元数据复制入口在RocketMQ中的

2、主从架构中,在启动s1ave节点的过程中会启动一个定时任务,该定时任务的功能是从HIaSter节点获取元数据。具体如下:rivateVOidhand1eS1aveSynchronize(BrOkerRoIero1e)if(ro1e=BrokerRo1e.S1AVE)if(nu11!=SIaVeSynCFUtUre)S1aVeSynCFUtUre.CanCe1(fa1se);Ihis.S1aveSynchronize.SeIMaSIerAddr(nu11);S1aVeSynCFU1Ure二1his.SChedU1eCIEXeCUIorSetice.Schedu1eAtFixedRate(newR

3、unnab1eO0VerridPUbIiCVoidrun()BrokerContro11er.thiS1aveSynchronize.SynCAII();1og.error(riSchedu1edTISkS1aveSynchronizeSynCA1Ierror.”,e);,IOOO*3,IOOO*10,TimeUnit.MI11ISECONDS);e1sehand1ethes1if(nu11SIaVeSynCFUtUre)SIaVeSynCFUtUre.CanCe1(fa1se);1his.S1aveSynchronize.SC1MaSterAddr(nu11);this.SynCToPiCC

4、onfig()this.SynCConSUmerOffSet()2 Ihis.SynCSUbSCriPIiOnGrOUPCOnfig();3 .元数据复制都包含哪些内容?从SynCAI1()方法可以看出元数据复制主要包含以下文件:(1) topics,json:topic配置文件(2) ConsumerOffset.json:ConSUmer消费进度文件(3) de1ayffset.json:延迟消息拉取进度(4) SubscriptionGroup.json:ConsumerGroup配置文件3.元数据同步流程在RocketMQ中四种元数据文件同步的流程是一样的,这里以topics,json

5、为例来分析其流程。从上面SynCAI1()方法可知:topic配置文件的同步入口是SyncTopicConfigO方法,具体如下:IPriVateVoidSyncTopicConfigOIStringInasterAddrBak=this.HiasterAddr;Iif(masterAddrBak!=nu11&!masterAddrBak.equaIIS(brokerContro11er.getBrokerAddr()IToPiCCOnfigSeria1iZeWraPPertopicWraIthis.brokerContro11er.getBrokIeroUterAPI().getA1IToP

6、iCConfig(masterAddrBak);Iif(!this.brokerContro11er.getTopicConfigManager().getDataVersionI.equa1s(topicWrapper.getDataVIIhis.brokerContro11er.getTopibConfigManager().getDataVersion()I.assignNewOne(topicWjgetDatayersionO);this.brokerContro11er.getTopiIthis.brokerContro11er.getTopiI.PUtAI1(topicWrappe

7、r.Ithis.brokerContro11er.getTopiI1og,info(UpdateSIaVetopiConfigfrommaster,“,masterAddrBak);1og,error(SyncTopicConfigEXCePtioI首先s1ave会通过getA1ITopicConfig方法以同步调用的方式向master发送RequestCode.GET_A11_TOPTC_CONFIG请求来获取topic配置文件信息。IUb1iCTOPiCConfigSeria1iZeWraPPergetA1ITopicConfig(fina1S1ringaddr)IhrOWSRemotin

8、gConnec1Excep1i(RemotingSendRequestException,RemotingTinieoutException,InterruptedException,MQB1rokerExceptionRenIOtingCommandrequest二RemotingCommand.CreateRe1RCmoIingCOmnIandresponse二IhiS.Jremo1ingC1ien1.invOkeSynC(MiXA11brC)kerVIPChanne1(true,addr),request,assertresponse!=nu1SWitCh(response.getCod

9、e()returnToPiCConfigSeria1iZeWraPPer.decod。(response.gc1Body(),TOPiCCOnfigSCriaIiZeWraPper.CIdSS)defau1t:break;thrownewMQBrOkerEXCePtiOn(response.getCode(),reSPonSe.getRemark(),addr);master在收到s1ave端的请求后会在AdminBrokerProcessor中进行处理,具体是调用getAHTopicConfig方法来处理,其处理过程就是将master端的topicConfigTab1e和dataVersio

10、n编码成json字符串并返回给s1aveoIriVaICRemOIingeommandge1A1IToPieConfig(ChanneIHand1erCOnIeX1C1fina1RemotingCommandresponse二RemotingCommand./fina1GetAIIToPiCConfigReSPonSeHeaderresponseh(GetA11TopiCConfigResponseHeader)response,read(SIringCon1en1二IhiS.brokerConIrOI1Cr.gc1TOPiCConif(Content!=nu11&content,1ength

11、。0)response.SetBOdy(COntent.getBytes(MixCatCh(UnSUPPortedEnCodingEXCePtionC)1og.error(r,e);response.SetCOde(ReSPonSeCOde.SYSTEMIresponse.SetRenIark(UnsupportedEncodiIIngExceptionW+e);Ire1urnresponseI1og,error(Notopicinthisbroker,C1ie1nt:,ctx.channe1().remoteAddress();Iresponse.SetRemark(Notopicinthi

12、sbrokIreturnresponseresponse.setCode(ResponseCode.SUCCESS);returnresponse;s1ave在收到master返回的数据后会先判断本地的dateVersion与master返回的是否一样,如果不一样则会进行以下操作:(1)更新S1aVe的dataVersion(2)清空s1ave端的IopicConfigTab1e并将master返回的数据写入(3)将topic配置进行持久化最后用下图来总结下整个流程:RocketMQ其余的元数据同步过程与上图一样只是发送的请求类型不一样,在阅读源码时我有注意到一个问题:在同步topic配置文件

13、时采用的是VIP通道(即连接的是master的10909端口),而在同步其余三种元数据时采用的是10911端口,那么问题就是其余三种元数据在同步时为什么采用的是10911而不是10909?我在GitHUb上开了一个issue,如果大家有兴趣可以一起讨论。这里我认为所有的元数据同步应该都使用10909端口,所以在GitHUb提了一个pr来修复该问题。二、COmmitIOg复制commit1og复制相关服务是如何被启动的呢?broker在启动过程中会启动Defau1tMessageStore,在启动Defau1IMessageStore的过程中会判断broker是否启用了DIedger,如果没有启

14、动则会启动HASerVice,具体如下:IUb1iCVOidSIar1()IhrOWSEXCePIiOnIoCk=IoCkFiIe.ge1Channe1().Iry1ockS,1,fa1se);if(1ock=nu111ock.isShared()!1ock.isVa1id()thrownewRuntimeException(,1ockfai1ed,MQa1readystarted);IockFi1e.getChanne1().Write(ByteBUffer.VTaP(1ock”.getBytes();IockFi1e.getChanne1().force(true);icQueuemayb

15、obis、thi(Vturnedby1-orConunit1()jjus1IIOngmaXPhySiCaiTosIn1ogicQueue=COmInit1og.get1inffset()Ifor(COnCUrrentMaPInteger,ConsumeQueueAmaps:this.consumeQueueITabIe.va1ues。)Ifor(COnSUmeQUeUeIogiC:maps.VaIUeS()Iif(1ogic.ge1MaxPhysicOffset()maxPhysiCa1PoSIn1ogiCQUeUe)maxPhysica1PosIr1ogiCQUeUe二1ogic,getMaXPhySiCOffset();if(maxPhysica1PosIn1ogicQueueO)ITIaXPhySiCa1PoS1n1OgiCQUeUe二0;if(!Inessa

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 应用文档 > 汇报材料

copyright@ 2008-2022 001doc.com网站版权所有   

经营许可证编号:宁ICP备2022001085号

本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有,必要时第一文库网拥有上传用户文档的转载和下载权。第一文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知第一文库网,我们立即给予删除!



客服