RocketMQ源码分析之消息轨迹分析.docx

上传人:lao****ou 文档编号:406523 上传时间:2023-10-25 格式:DOCX 页数:32 大小:142.29KB
下载 相关 举报
RocketMQ源码分析之消息轨迹分析.docx_第1页
第1页 / 共32页
RocketMQ源码分析之消息轨迹分析.docx_第2页
第2页 / 共32页
RocketMQ源码分析之消息轨迹分析.docx_第3页
第3页 / 共32页
RocketMQ源码分析之消息轨迹分析.docx_第4页
第4页 / 共32页
RocketMQ源码分析之消息轨迹分析.docx_第5页
第5页 / 共32页
亲,该文档总共32页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《RocketMQ源码分析之消息轨迹分析.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之消息轨迹分析.docx(32页珍藏版)》请在第一文库网上搜索。

1、RocketMQ源码分析之消息轨迹【导读】本篇文章将会从以下方面介绍消息轨迹:什么是消息轨迹?如何启用消息轨迹?示例demo、消息轨迹的源码分析和消息轨迹中存储的信息。最后会用一张图来总结消息轨迹的工作原理。一、消息轨迹1什么是消息轨迹?消息轨迹是用来跟踪记录消息发送、消息消费的轨迹。2.如何启用消息轨迹?broker端需要在broker端的配置文件中添加配置项:traceTopicEnab1e=true,注意:对于消息轨迹数据量较大的场景,可以在R。CketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。客户端pro

2、ducer端和consumer端需要启用消息轨迹,具体是在初始化客户端时打开打开启用消息轨迹的开关并根据实际需求决定是否使用默认的topic来存储消息轨迹pub1icDefau1tMQProducer(fina1StringproducerGroup,boo1eanenabeMsgTrace)PUb1iCDefaU1AMQProducer(fina1StringProdUCerGrOup,boo1eanenab1eMsgTrace,fina1StringCUStOnIiZedTraCeTOPiC)PUb1iCDefa111.tMQProducer(fina1StringProdUCerGrou

3、p,RPCHoOkrpcHook,boo1eanenab1cMsgTrace,fina1SIringCUS1onIiZedTraCCToPiC)IPUb1iCDCfaU11MQPrOdUCCr(fina1SIringnamespace,fina1S1ringP1roducerGroup,RPCHookrpcHook,boo1eanenabIeMsgTrace,fina1Stri1ngCUStOmiZedTraCeToPiC)PUbiiCDefau1tMQPushConsumer(fina1StringConSUmerGrOup,boo1ean,enab1eMsgTrace)PUb1iCDefa

4、UI1MQPUShConSUnIer(fina1S1ringConSUnIerGrOUp,boo1ean,CnabIcMsgTrace,Fina1S1ringCUS1omiZCdTraeePUbIiCDefaU1IMQPUShCOnSUmer(fina1StringConSUmerGroup,RPeHooMrpcHook,A1IoCateMeSSageQUeUeStrategya1IocateMessageQueueStrategIy,boo1eanenabIeMsgTrace,fina1StringCustoniizedTraceTopic)pub1icDefaUItMQPUShCemSUm

5、er(fina1Stringnamespace,fina1Strin1IgCOnSUmerGroup,RPCHOOkrpcHook,AI1OCateMeSSageQUeUeStrategyaI11ocateMessageQueueStrategy,boo1eanBnab1eMsgTrace,fina1String1消息轨迹存储的topic默认情况下消息轨迹是存储在RMQ.SYS_TRACETOPIC,此外消息轨迹还可以存储在用户自定义的topic中,注意:自定义的topic需要提前创建3.示例demoIPUbIiCStatiCVoidmain(String1args)throwsMQCIien

6、tE1xception,InterrUPtedEXCePtionIDefaU1tAIQProducerPrOdUCer=newDefaUI1MQProdUCe1卜(ProducerGroupNam。,1rue);ior(hi:i():i/128:iMeSSagemsg=newMeSSageCTopicTestr,OrderID188r,“He11owor1d.getBytS(RemotingHe1Per.DEFA11T_CHARSET);SendReSUI1SendReSUII=Pt)ducer.Send(msg);System.out.printf(,%s%n,sendResu1t);Cat

7、Ch(EXCePtiOne)e.prin1S1ackTrace();producer,shutdown();PUb1iCStatiCVoidmain(String口args)throwsInteTTUPt1/Here,IFeUSethedefan11messagetracktraceIDefau1tMQpushConsumerCOnSUmer二newDefaU1.tMQPusIconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMEIConSUmer.registerMessage1istener(newMeSSage1iStener1ICOn

8、CUrrent1y()IPUb1iCConSUnIeConCUrren11ySIatUSConSUnIeMeS1ISage(1iStmsgs,COnSUnIeConCUrrentJyCon1exICOnteXI)ISyStem.out,printf(/sReCeiVeNeWMessages:%s/n”,Thread.CurrentThread().getName(),msgs);MIreturnCOnSUmeConCUrrentI,yStatus.CONCOnSUIner.Start();Sys1em.OUI.PrinIf(ConsumerS1ar1ed.);二、消息轨迹源码分析下面分别从pr

9、oducer端、consumer端看看消息轨迹是如何产生以及如何发送到broker端。1、 producer端(1)初始化PrOdUCer在PrOdUCer初始化阶段会完成以下操作: 初始化producer 初始化AsyncTraceDiSpatcher对象traceDispatcher,AsyncTraceDiSpatcher是实现消息轨迹功能的重点,后面会详细介绍 在PrOdUCer端会注册SendMeSSageHOOkIPUb1iCDefau1tMQProduceNfina1Stringnamespace,fina1StringP1KodUCerGrOup,RPCHOOkrpcHook,

10、Iboo1eanenabIeMsgTrace,fina1StringCUStOiniZedTr1Ithis,namespace二namespaceIthis.producerGroup二PrOdUCerGrOUICIefaU1tMQProdUCer1mP1二newDefaU1tMQPrOcIUCerImP1(IrpcHook);AsyncTraceDispatcherdispatcher=newASynCTraCeDiSPatXher(PrOdUCerGrOUp,TraCeDiSPatXher.Type.PRODiC1E,CUStOmizedTraceTOPic,rpcHook);dispat

11、cher.SetHOStPrOdUCer(Ihis.ge1DetraceDispatcher二dispatcher;Jthis.getDefau1tMQProducer1mp1().regisnewSendMeSSageTraCeHOok1mP11(IraCeDjSPatCher);CatCh(ThrOWabIeC)1og,error(systemmqtracehookini.J,.!.r1:-H11;.r.,1.,J;:(2)启动Pre)CIUCer在启动ProC1UCer的时候会启动traceDispatcher,下面详细看下启动traceDispatcher的过程中都完成了哪些操作: 在

12、ASynCTraCeDiSPatCher中traceProducer的作用是发送消息轨迹到broker,这里会启动traceproducer 会启动一个WOrker线程,其完成的任务封装在ASynCRUnnabIe中 注册一个ShutdownHookIPUbIiCVOidStart()throwsMQCiientEXCePtionIthis.SetPrOdUCerGroUP(WithNameSPaCe(this.PrOdUCerGrOU1Ithis.defaItMQProducerImp1Start();Iif(nu11!二traceDispatcher)IIraceDispatcher.St

13、art(Ihis.getNamesrvIAdeIr(),this.getAccessChanne1();ICatCh(MQCIientEXCePtiOne)1og,warn(tracedispatcherStartfii1edn,e);pub1icVoidStart(StringnameSrvAddr,ACCeSSChanne1accessChanneif(isStartec1CompareAndSet(fa1se,true)traceProducer.SetNamesrvAddr(nameSrvddr);tracePduceseHnstanceNamACNSTANC,1hI,djr.n::I

14、IhiS.accessChanne1二accessCharme1;IIhiS.worker二newThread(newASynCRUrmabIC(),W-AsyncTraceDiSpatcher-Thread-+dispatcherId)Ithis,worker.SetDaenIon(true);Ithis,worker.Start();Ithis.regiSterShutDownHook();下面详细分析AsyncRunnab1e,从其run函数可以看到它主要完成的任务是从IraceContextQueue队列中获取消息轨迹上下文信息,这里会获取IOO条信息,然后会根据这IOO条TraceC

15、ontext初始化一个AsyncAppenderRequest对象,最后将其提交到traceExecutor线程池中(注意:traceContextQueuebatchSize以及traceExecutor是在初始化traceDispatcher时确定的)。这里会有一个疑问:ASynCAPPenderReqUeSt的作用是什么?带着这个问题我们继续看AsyncAppenderRequestWhi1e(!stopped)Array1ist(batchSize);for(inti=0;ibatchSize;i)TraceContextcontext=nu1EBtrygettracedataeb1ockingQueuetraceContextQucontext=traceContxtQueue.po11(5,Timeun

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 应用文档 > 汇报材料

copyright@ 2008-2022 001doc.com网站版权所有   

经营许可证编号:宁ICP备2022001085号

本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有,必要时第一文库网拥有上传用户文档的转载和下载权。第一文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知第一文库网,我们立即给予删除!



客服