RocketMQ源码分析之consumer并发消费信息分析.docx

上传人:lao****ou 文档编号:406513 上传时间:2023-10-25 格式:DOCX 页数:22 大小:119.33KB
下载 相关 举报
RocketMQ源码分析之consumer并发消费信息分析.docx_第1页
第1页 / 共22页
RocketMQ源码分析之consumer并发消费信息分析.docx_第2页
第2页 / 共22页
RocketMQ源码分析之consumer并发消费信息分析.docx_第3页
第3页 / 共22页
RocketMQ源码分析之consumer并发消费信息分析.docx_第4页
第4页 / 共22页
RocketMQ源码分析之consumer并发消费信息分析.docx_第5页
第5页 / 共22页
亲,该文档总共22页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《RocketMQ源码分析之consumer并发消费信息分析.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之consumer并发消费信息分析.docx(22页珍藏版)》请在第一文库网上搜索。

1、RocketMQ源码分析之consumer并发消费信息在RocketMQ源码分析之消息拉取流程文章最后留下了一个问题:consumer端在接收到消息后是如何消费信息呢?本篇文章就来回答这个问题。在RocketMQ中ConsumeMessageService是负责消息消费的,它其实是一个接口,实现该接口的是ConsumeMessageConcurrent1yService和ConsunieMessageOrder1yService,这两个服务分别对应一个消费模式,ConsumeMessageconcurrent1yService是并发消费,ConsumeMessageOrder1yService

2、是顺序消费。在RocketMQ源码分析之消息拉取流程中COnSUmer在收到broker返回的response后会执行回调pu11Ca11back,在回调函数中会将拉取到的消息放入ProcessQueue中,然后再将消息提交到ConsumeMessageQueue,我们就从这里开始分析,即submitConsumeRequest方法。IDefaUi,tMQPushConsumerImp1.this.ConSUmeMeSSageSerVice.SUbmitConSUmeReq1PU1IReSUIt.getMsgFound1ist(),PrOCeSSQUeUe,pu11Request.getMes

3、sageQueue(),SubmitConsumeRequest方法主要完成的提交信息消费请求,其实现逻辑如下:获取ConSUnIeBatChSize,它表示一次消费任务COnSUnIeReqUeSt中包含的消息条数,默认值是1。msgs,size()最大是32,如果msgs,size()小于ConsumeBatchSize则直接由msgs、processQueue和messageQueue构建ConsumeRequest,并将ConSUnIeReqUeSt提交到ConSUnIeEXeCUtOr(消费者线程池),如果在提交的过程中出现RejectedExecutionException异常则延

4、迟5秒再提交。如果InSgS.size()大于COnSU1neBatChSiZe则将msgs进行拆分,创建多个COnSUmeReqUeSt并进行提交,每个COnSUmeReqUeSt中包含ConsumeBatchSize条消息。IUb1icVOidSUbmitConsumeRequestfina11iStfina1ProCeSSQUeUePrOCeSSQUeUe,fina1MeSSageQUeUemessageQueue,fina1boo1eandiSpatchToConsume)tConsumeMessageBatchMac;八.if(msgs,size()二ConsumeBatchSize

5、)COnSUmeReqUeStCOnSUmeReqUeSt二newCOnSUmeReqUeSIhis.ConsumeExecu1or.SUbnIi1(COnSUnIeRCqUeSI);catch(RejectedExecutiOnExceptione)this,submiIConsumeRequest1ater(consumeRequeste1sefor(inttota1=0;tota1msgs,size();)1iStmsgThis=newArray1iStfor(inti=0;iConsumeBatchSize;i+,tota1+)if(tota1msgs,size。)msgThis.ad

6、d(msgs,get(tota1);e1sebreak;newConsuCOnSUn1eReqUeStCOnSUn1eReqUeStheRequest(msgThis,PrOCeSSQUeue,messageQueue);Ithis.consumeExecutor.SUbmit(COnSUmeRe1ICatCh(RejeCtedEXeCUtiOnEXCePtiOne),Ifor(;tota1msgs,size。;totaImsgThis.add(msgs,get(tota1);this.SUbmitCOnSUnIeReqUeSt1ater(COnSUm提交完ConSUmeReqUeSt后就是消

7、息消费了,具体是执行ConSUmeReqUeSt中的run方法,消费的具体逻辑是:1判断processQueue允许被消费,具体是检查其dropped属性,如果为true则表示不能被消费2 .初始化Message1istenerxConsumeconcurrent1yContext(consumer并发消费上下文)和Consumeconcurrent1yStatus(consumer并发消费状态,其状态分为两种:CONSUME_SUCCESS和RECONSUMEJATER)3 .执行KesetRetryAndNamespace方法,如果消息来自延迟队列则设置其topic为%RETRTOPIC%

8、+ConSUmerGroUP4 .如果开启了消息轨迹功能会在此处设置消费信息上下文ConsumeMessageContext并执行钩子函数5 .记录当前的时间(在消息消费完成后与此时间做差用来统计消费消息锁花费的时间)6 .遍历ConsumeRequest中的每条消息,将每条消息中开始消费时间属性设置为当前的时间,然后使用应用代码中注册的回调message1istener中的ConsumeMessage方法来消费消息并将消费结果记录在status中7 .计算消费消息所花费的时间ConsumeRT8 .判断执行完consumer中自定义的ConsumeMessage方法后的status,如果st

9、atus为空且在消费过程中出现异常则将returnType设置为ConsumeReturnType.EXCEPTION;如果status为空且没有出现异常则将returnType设置为ConsumeReturnType.RETURNNU11;如果ConsumeRT大于15分钟则将returnType设置为ConsumeReturnType.TIME_OUT;如果status的值为Consumeconcurrent1yStatus.RECONSUME_1ATER则将returnType设置为ConsumeReturnType.FAI1ED;如果status的值为ConsumeConcurrent

10、1yStatus.CONSUME_SUCCESS则将returnType设置为ConsumeReturnType.SUCCESS9 .如果开启了消息轨迹功能则将消费消息信息上下文ConsumeMessageContext中添加ConsumeContextType属性并将该属性的值设置为returnType10 .判断status是否为空,如果为空则将status设置为ConsumeConcurrentIyStatus.RECONSUME_1ATER11 .如果开启了消息轨迹功能则在此处会执行钩子函数12 .获取ConsumerStatsManager对象并将刚刚计算的消费消息花费时间添加到统计

11、信息中13 .判断processQueue的属性是否为fa1se,如果是fa1se则调用ProcessConsumeResuIt方法对消费信息的结果进行处理)ub1icVoie1run()if(this.processQueue.isDropped1og,info(themessagequeuenotbeab1etoCon1SUme,becauseitsdropped,group=,COnSUnIeMeSSageCOnCUrr1?ntIyService,this.COnSUmerGroup,this.messageQueue);return;ConsumeMessageConcuMessage

12、1istenerConcurrent1y1istenerrrentIyService.this.Hiessage1istener;COnSUnIeConCUrrent1yConteXtCOnteXt二newCOnSUmeCOnCUrrertIyContext(messageQueue);ConSUmeConCUrrentIyStatUSStatUS二nu11;defau1tMQPushConsumer1mp1.resetRetryAndNamespace(msgs,defau11MQPUShConSUmer.ge1ConsumerGroup();if(ConsumeMessageconcurr

13、ent1yService.this.defau1tMQPushConsCOnSUnIeMeSSageCOnteXt=newCOnSUmeMeSSageConteXt(IEBIConSUnIeMeSSageCOntext.SetNanIeSPaCe(defau1tMQPushConsumer.geINamespace();ICOnSUmeMeSSageContext.SetConSUmerGrC)UP(defau1tMQPushConsumer.getConsumerGroupO);ICOnSUmeMeSSageCOnteXt.setProps(newHaShMaPString,IConSUnI

14、eMeSSageCOntext.SetMq(IIIeSSageQUeUe);IConSUmeMeSSageContext.SetMSg1iSt(InSgS);ICOnSUnIeMeSSageCOntext.SetSUCCeSS(fa1se);ICOnSUmeMeSSageConCUrrent1yService.Ihis.defau11MQPushCInSUmerInIP1execute11ookBefore(ConsumeMessageContext);IongbeginTimestamp=SyStem.CurrentTimeMi11isOboo1eanhasException二fa1seCo

15、nSUmeRe1IIrnTyPerc1urnType二ConSUnICRe1UnITyPC.SUCCESS;1if(msgs!=nu11&!msgs.isEmptyfor(MeSSageEXtmsg:msgs)MessageAccessor.SetConSUmeStartTimeStS1a1US二IiS1Cnor.ConSUn1CMeSSagC(Co1ICCIiOns.UnmOdiTiabIe1ist(msgs),COnteXt);CatCh(ThroWabIee)Iog.warn(ConsumeMessageexception:Group:,RemOtingHe1,per,exceptionSimp1eDesc(e)ConsumeMessageconcurrent1yService.this,consummsgs,messageQueue);hasException=true;1ongConsumeRT=System.CurrentTimeMi11is()-beginTimestamp;if(nu11=status)

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 应用文档 > 汇报材料

copyright@ 2008-2022 001doc.com网站版权所有   

经营许可证编号:宁ICP备2022001085号

本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有,必要时第一文库网拥有上传用户文档的转载和下载权。第一文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知第一文库网,我们立即给予删除!



客服