《RocketMQ源码分析之ACL简介.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之ACL简介.docx(29页珍藏版)》请在第一文库网上搜索。
1、RocketMQ源码分析之AC1XhW【导读】本篇文章主要介绍了AC1的配置项、如何启用、示例demo以及其源码分析。一、AC1介绍1什么是AC1?AC1(权限控制)是ROCketMQ提供topic资源级别的客户端访问控制,客户端在使用ROCketMQ权限控制时,可以在客户端通过RPCHOOk注入ACCeSSKey和SecretKey签名。2 .p1ain_ac1.ym1文件在ROCketMQ中权限控制属性的配置是在PIain_ac1.ym1中,其配置项及其含义具体如下字段取值含义g1oba1WhiteRemoteAddresses*;192.168.*.*;192.168.0.1全局IP白名
2、单accessKey字符串AccessKeySecretKey字符串SecretKeyWhiteRemoteAddress*;192.168.*.*;192.168.0.1用户IP白名单admintrue;fa1se是否管理员账户defau1tTopicPermDENY;PUB;SUB;PUBISUB默认的TOPiC权限默认的defau1tGroupPermDENY;PUB;SUB;PUBISUBConsumerGroup权限topicPermstopic=权限各个Topic的权限各个groupPermsgroup=权限ConsumerGroup的权限字段取值含义这里RocketMQ对资源访问
3、控制权限的定义有以下四种:权限含义DENY拒绝ANYPUB或者SUB权限PUB发送权限SUB订阅权限注意:RoCketMQ的权限控制存储的默认实现是基于ym1配置文件。用户可以动态修改权限控制定义的属性,而不需重新启动Broker服务节点。3 .如何启用AC1?broker端在broker端首先需要配置好p1ain_ac1.ym1,然后在broker,conf中添加配置项ac1Enab1e=true(该配置项默认是fa1se)客户端客户端通过RPCHook注入AccessKey和SecretKey筌名4 .示例demobroker端p1ain_ac1.ym1配置1oba1WhiteRemQte
4、Addresses:ICCOUn1S:-accessKey:RoCketMQBWhiteRemoteAddress:admin:fa1sdefau1tTopicPerm:DENYtopiPrm_topic0607二P1BIgroupPerms:accessKey:rockeImqWhiteRemoteAddress:producerPriVateSIatiCfina1StringAC1SECRETKEY=”12345678”IPUbIiCStatiCVOidmain(String口args)throwsMQCIientE1newDefau1tMQProduceDefau1tMQProducer
5、producerr(ProdUCerGroUPName”,getc1RPCHook();producer.SetNamesrvAddr(,127.O.O.1:9876,t);ProdUCer.Start();Messagemsg=newMessageI1owor1”.ge1By1es(Reino1ingHe1per.DEFAU1TCHARSET);SendReSUItSendReSUIt=PtICIUCer.Send(msg);System.out.printf(,%s%nr,SEidResu1t);CatCh(EXCePtione)e.printStackTrace();PrOdUCer.S
6、hUtdOWn();StatiCRPCHOokgetAc1RPCHookOreturnnewAc1C1IentRPCHook(newSessionCredentia1s(AC1accesskey,Acusecretkey);consumer)ub1icc1assAC1ConsumerTestPriVateStatiCfina1StringAC1_ACCESS_KEY=nRocketMQ,PriVateStatiCfina1StringAC1_SECRET_KEY=”2345678PUb1iCS1a1iCvoidmain(S1ring口args)IhrowSMQC1ienIE1Defau1tMQ
7、pushConsumerconsumer=newDefau1tMQPushConsumer(r,testGroup02,getAcIRPCHook(),newA11OcateMessageQueueAVerageIy();Iconsumer.SetConsumeFromWhere(ConsumeFromWhere.CONSUMEICOnSUnIer.rcgisterMessage1istcner(newMeSSage1iStener1ICOnCUrren11y()IPUbIiCConsumeconcurrent1yStatusConSUmeMeS1ISage(1iStVMessageExtms
8、gs,Consumeconcurrent1yContextConteXt)ISySten.out,printf(%sReCeiVeNeWMeSSageS:%s”,Thread.CUrrenIThread().ge1Name(),msgs);IreturnConSUmeConCUrrent1yStatUS.CONconsumer,start();SySten1out.Printf(ConsumerStarteC1%n);StatiCRPCHoOkgetAc1RPCHook()returnnewACiCIentRPCHook(newSeSSionCredentiaI二、AC1源码分析在启动AC1功
9、能后,我们分别从broker端及客户端来分析其工作原理。1broker在启动时如何启用AC1?在broker启动过程中在对BrokerContro11er进行初始化的过程中会调用initia1Ac1(),这里该方法主要完成的操作如下:(1)首先判断broker端是否启用了AC1(2)如果broker端启用了AC1则会加载uMETA-INF/service/org.apache,rocketmq.ac1.AccessVa1idator”路径下所有实现了AccessVa1idator接口的实现类,这里该文件的内容是aorg.apache,rocketmq.ac1.p1ain.P1ainAccess
10、Va1idator,也就是说broker端采用P1ainAccessVa1idator作为默认的权限访问校验器(3)注册RPCHOOk,这里注册分为两个一个是向remotingServer注册一个是向fastRemotingServer注册,其本质是将RPCHook添加到rpcHooks(是个1ist)riVateVOidinitia1Ac1()1og,info(Thebrokerdosenotenab1eac1return;1istaccessVa1idators=ServicePrvider.IoacKServiceProvider.AC1_VA1IDAT()R_ID,AccessVa1i
11、dator.c1ass)Iif(accessVa1idators=nu11MaccessVa1idatorsI1og,info(ThebrokerdosenotIOadIheIACCeSSVa1ida1or);for(ACCeSSVaIidatOraccessVa1idator:accessVa1idafina1ACCeSSVa1idatorVa1idatOr二accessVdator;accessVa1ida1orMap.PUt(VaIidatOr.getC1ass(),vi1idator);this.registerServerRPCHook(newRPCHOOk()OverridPUb1
12、iCVoiddoBeforeReqUeSt(StringremoteAddr,RenIotingCommandrequest)/DonotCa1ehtheexceptva1idator,va1idate(va1idator.ParSe(request,remoteAddr);0VerridPUbIiCVoidC1OAf1erReSPonSe(Stringremo1eAddr,Ren1O1ingCommandrequest,RemOIingCommandresponse)看到这里有个问题:上面注册的RPeHook是在什么时候调用的?broker端在接收到客户端的请求后会按照服务端启动过程中添加的
13、hand1er来对请求进行处理,其中业务上的处理实现是NettySerVerHanCner,在NettySerVerHanCner中重写了ChanneIReaC1o方法,顺着该方法往里面深入研究可以发现:在ProcessRequestCommand方法中有调用上面注册的RPCIIook,即doBefOreRpcHooks方法,这里仅展示部分代码:IPUb1iCVOidProCeSSReqUeStComnIand(fina1Channei.Hand1erContextCtX1fina1RemOIingCOmmandCmd)Ifina1PairNe1IyRcquesIProcessor,EXCCu
14、1O1SeiViCCImatched二this.ProCeSSOrTab1e.get(endgetCode();Ifina1PairVNettyRequestProcessor,EXeCUtOrSerVice,IPair=nu11二二matched?this.C1efau1tRequestProcessor:matIfina1intOPaqUe二Cmd.getpaque();IRUrmab1erun二newRunnab1eOIPUbIiCVOidrun()IC1oBeforeRpcHooks(RemotingHe1per.ParSeChanne1_RemoteAddr(ctx.ChanneI(
15、),Cmd);Ifina1RenIOtingReSPon1seC1)i,k(,11i)ck=:v,J,eotiiigResponseC;!111);ick()OverridePUbIiCVOiCaI1baCk(Ren1otingCommandresponse)doAftTRPCHOOkS(RemotingHei,per.ParSeChanne1.RemoteAddr(Ctx.Channei(),Cn1dresponse);if!cmd.iSOnewayRPCO)if(response!=nu11)response.SetOPaqUe(OPaqUe);response.InarkResponseT