《RocketMQ源码分析之消息轨迹分析.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之消息轨迹分析.docx(32页珍藏版)》请在第一文库网上搜索。
1、RocketMQ源码分析之消息轨迹【导读】本篇文章将会从以下方面介绍消息轨迹:什么是消息轨迹?如何启用消息轨迹?示例demo、消息轨迹的源码分析和消息轨迹中存储的信息。最后会用一张图来总结消息轨迹的工作原理。一、消息轨迹1什么是消息轨迹?消息轨迹是用来跟踪记录消息发送、消息消费的轨迹。2.如何启用消息轨迹?broker端需要在broker端的配置文件中添加配置项:traceTopicEnab1e=true,注意:对于消息轨迹数据量较大的场景,可以在R。CketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。客户端pro
2、ducer端和consumer端需要启用消息轨迹,具体是在初始化客户端时打开打开启用消息轨迹的开关并根据实际需求决定是否使用默认的topic来存储消息轨迹pub1icDefau1tMQProducer(fina1StringproducerGroup,boo1eanenabeMsgTrace)PUb1iCDefaU1AMQProducer(fina1StringProdUCerGrOup,boo1eanenab1eMsgTrace,fina1StringCUStOnIiZedTraCeTOPiC)PUb1iCDefa111.tMQProducer(fina1StringProdUCerGrou
3、p,RPCHoOkrpcHook,boo1eanenab1cMsgTrace,fina1SIringCUS1onIiZedTraCCToPiC)IPUb1iCDCfaU11MQPrOdUCCr(fina1SIringnamespace,fina1S1ringP1roducerGroup,RPCHookrpcHook,boo1eanenabIeMsgTrace,fina1Stri1ngCUStOmiZedTraCeToPiC)PUbiiCDefau1tMQPushConsumer(fina1StringConSUmerGrOup,boo1ean,enab1eMsgTrace)PUb1iCDefa
4、UI1MQPUShConSUnIer(fina1S1ringConSUnIerGrOUp,boo1ean,CnabIcMsgTrace,Fina1S1ringCUS1omiZCdTraeePUbIiCDefaU1IMQPUShCOnSUmer(fina1StringConSUmerGroup,RPeHooMrpcHook,A1IoCateMeSSageQUeUeStrategya1IocateMessageQueueStrategIy,boo1eanenabIeMsgTrace,fina1StringCustoniizedTraceTopic)pub1icDefaUItMQPUShCemSUm
5、er(fina1Stringnamespace,fina1Strin1IgCOnSUmerGroup,RPCHOOkrpcHook,AI1OCateMeSSageQUeUeStrategyaI11ocateMessageQueueStrategy,boo1eanBnab1eMsgTrace,fina1String1消息轨迹存储的topic默认情况下消息轨迹是存储在RMQ.SYS_TRACETOPIC,此外消息轨迹还可以存储在用户自定义的topic中,注意:自定义的topic需要提前创建3.示例demoIPUbIiCStatiCVoidmain(String1args)throwsMQCIien
6、tE1xception,InterrUPtedEXCePtionIDefaU1tAIQProducerPrOdUCer=newDefaUI1MQProdUCe1卜(ProducerGroupNam。,1rue);ior(hi:i():i/128:iMeSSagemsg=newMeSSageCTopicTestr,OrderID188r,“He11owor1d.getBytS(RemotingHe1Per.DEFA11T_CHARSET);SendReSUI1SendReSUII=Pt)ducer.Send(msg);System.out.printf(,%s%n,sendResu1t);Cat
7、Ch(EXCePtiOne)e.prin1S1ackTrace();producer,shutdown();PUb1iCStatiCVoidmain(String口args)throwsInteTTUPt1/Here,IFeUSethedefan11messagetracktraceIDefau1tMQpushConsumerCOnSUmer二newDefaU1.tMQPusIconsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMEIConSUmer.registerMessage1istener(newMeSSage1iStener1ICOn
8、CUrrent1y()IPUb1iCConSUnIeConCUrren11ySIatUSConSUnIeMeS1ISage(1iStmsgs,COnSUnIeConCUrrentJyCon1exICOnteXI)ISyStem.out,printf(/sReCeiVeNeWMessages:%s/n”,Thread.CurrentThread().getName(),msgs);MIreturnCOnSUmeConCUrrentI,yStatus.CONCOnSUIner.Start();Sys1em.OUI.PrinIf(ConsumerS1ar1ed.);二、消息轨迹源码分析下面分别从pr
9、oducer端、consumer端看看消息轨迹是如何产生以及如何发送到broker端。1、 producer端(1)初始化PrOdUCer在PrOdUCer初始化阶段会完成以下操作: 初始化producer 初始化AsyncTraceDiSpatcher对象traceDispatcher,AsyncTraceDiSpatcher是实现消息轨迹功能的重点,后面会详细介绍 在PrOdUCer端会注册SendMeSSageHOOkIPUb1iCDefau1tMQProduceNfina1Stringnamespace,fina1StringP1KodUCerGrOup,RPCHOOkrpcHook,
10、Iboo1eanenabIeMsgTrace,fina1StringCUStOiniZedTr1Ithis,namespace二namespaceIthis.producerGroup二PrOdUCerGrOUICIefaU1tMQProdUCer1mP1二newDefaU1tMQPrOcIUCerImP1(IrpcHook);AsyncTraceDispatcherdispatcher=newASynCTraCeDiSPatXher(PrOdUCerGrOUp,TraCeDiSPatXher.Type.PRODiC1E,CUStOmizedTraceTOPic,rpcHook);dispat
11、cher.SetHOStPrOdUCer(Ihis.ge1DetraceDispatcher二dispatcher;Jthis.getDefau1tMQProducer1mp1().regisnewSendMeSSageTraCeHOok1mP11(IraCeDjSPatCher);CatCh(ThrOWabIeC)1og,error(systemmqtracehookini.J,.!.r1:-H11;.r.,1.,J;:(2)启动Pre)CIUCer在启动ProC1UCer的时候会启动traceDispatcher,下面详细看下启动traceDispatcher的过程中都完成了哪些操作: 在
12、ASynCTraCeDiSPatCher中traceProducer的作用是发送消息轨迹到broker,这里会启动traceproducer 会启动一个WOrker线程,其完成的任务封装在ASynCRUnnabIe中 注册一个ShutdownHookIPUbIiCVOidStart()throwsMQCiientEXCePtionIthis.SetPrOdUCerGroUP(WithNameSPaCe(this.PrOdUCerGrOU1Ithis.defaItMQProducerImp1Start();Iif(nu11!二traceDispatcher)IIraceDispatcher.St
13、art(Ihis.getNamesrvIAdeIr(),this.getAccessChanne1();ICatCh(MQCIientEXCePtiOne)1og,warn(tracedispatcherStartfii1edn,e);pub1icVoidStart(StringnameSrvAddr,ACCeSSChanne1accessChanneif(isStartec1CompareAndSet(fa1se,true)traceProducer.SetNamesrvAddr(nameSrvddr);tracePduceseHnstanceNamACNSTANC,1hI,djr.n::I
14、IhiS.accessChanne1二accessCharme1;IIhiS.worker二newThread(newASynCRUrmabIC(),W-AsyncTraceDiSpatcher-Thread-+dispatcherId)Ithis,worker.SetDaenIon(true);Ithis,worker.Start();Ithis.regiSterShutDownHook();下面详细分析AsyncRunnab1e,从其run函数可以看到它主要完成的任务是从IraceContextQueue队列中获取消息轨迹上下文信息,这里会获取IOO条信息,然后会根据这IOO条TraceC
15、ontext初始化一个AsyncAppenderRequest对象,最后将其提交到traceExecutor线程池中(注意:traceContextQueuebatchSize以及traceExecutor是在初始化traceDispatcher时确定的)。这里会有一个疑问:ASynCAPPenderReqUeSt的作用是什么?带着这个问题我们继续看AsyncAppenderRequestWhi1e(!stopped)Array1ist(batchSize);for(inti=0;ibatchSize;i)TraceContextcontext=nu1EBtrygettracedataeb1ockingQueuetraceContextQucontext=traceContxtQueue.po11(5,Timeun