Spark Shuffle原理及源码解析.docx

上传人:lao****ou 文档编号:406589 上传时间:2023-10-25 格式:DOCX 页数:32 大小:105.18KB
下载 相关 举报
Spark Shuffle原理及源码解析.docx_第1页
第1页 / 共32页
Spark Shuffle原理及源码解析.docx_第2页
第2页 / 共32页
Spark Shuffle原理及源码解析.docx_第3页
第3页 / 共32页
Spark Shuffle原理及源码解析.docx_第4页
第4页 / 共32页
Spark Shuffle原理及源码解析.docx_第5页
第5页 / 共32页
亲,该文档总共32页,到这儿已超出免费预览范围,如果喜欢就下载吧!
资源描述

《Spark Shuffle原理及源码解析.docx》由会员分享,可在线阅读,更多相关《Spark Shuffle原理及源码解析.docx(32页珍藏版)》请在第一文库网上搜索。

1、SparkShuff1e原理及源码解析Shuff1e的产生在Spark作业中当父RDD与子RDD的分区对应关系为多对多或者一对多的情况下会发生宽依赖,也即一个父RDD的分区需要分发到多个子RDD所在的任务中去执行,这种情况就会涉及数据的重新分布,也即产生了ShUffIe。Spark算子是否引入ShUff1e与各算子的具体实现有关,本质上是要看父子RDD的分区器的实现是否相同,例如:在执行聚合类算子reduceByKey时判断是否会引入shuff1e,需要分析父子rdd的分区器partitioner是否一致,如果不一致则创建一个ShUffIeRDD作为子RDD从而产生shuff1e:CrOatC

2、Combinor:二(,BiergeCombiners:(C,C)=C,seria1izer:SeriaiiZer二nu11)(imp1icitct:C1aSSTa1I)*:、)M二二,!i()*)h()IJ;.:一,IInewInterrUPtib1e1terator(Context,aggregator,combiI:1.,,:,-:I,Idr1,!:,-:,;:I.I7MUid:工1;O;,:1II11.,:111类似的再以join算子的实现为例,在CoGroupedRDD的getDependencies方法中遍历所有的父rdd,如果父rdd和子rdd的分区器一致则创建OneToOneD

3、ependency,否则创建Shuff1eDependency并引入shuff1e:MerridedefgetDependencies:SeqDependency_=rdds.maprdd:RDD=IiJ.:小:IogDebUg(AddingShUffIedependencyWith+rdd),newShUffIeDePendCneyK,Any,COGrOu1)CoInbinCrrdd.asInstanceOfRDD_:Product2K,_,part,ShUff1e文件的生成我们首先来看ShUffIe的第一个阶段:ShUffIeWrite阶段。通过SPark任务调度原理的讲解,我们知道在创建

4、taskset的过程中,如果当前的Stage是Shuff1eMapStage,则创建的任务类型为Shuff1eMapTask,否则task的类型为Resu1tTask,两种类型的TaSk均实现了runTask方法;如果发生了ShUffIe则执行Shuff1eMapTask实现的runTask方法,即根据rdd、dependencymapId等信息调用Shuff1ewriteProcessor的write方法执行shuff1e数据的写入:Va1rdd=rddAndDep.Va1dep=rddAndDep.N/WhiIeWeUSetheO1dShUff1efetchPrOtoCo1邛eUSePaI

5、deP.Shuff1eWriterProcessor,write(rdd,dep,map1d,context,Partit选择Shuff1eWriter在Shuff1ewriteProcessor的write方法中首先通过Shuff1eManager获取writer实例,然后再由相应的writer执行具体的write逻辑:IefWritedep:Shuff1eDependencyI1,_,pur1i()n:,;.):,:,【Varwriter:ShUffIeWriterAny,Any二nu11,Va1Inanager=SparkEnv.get.Shuff1eManageijmmjj1,;:,r

6、:、:rdd.i1era1or(par1i1ion,COnteXI).as1ns1ancefI1era1orSpark根据Shuff1eHand1e的不同采用相应的Shuff1eWriter的实现,包括:UnsafeShuff1eWriter.BypassMergeSortShuff1eWriter和SortShuff1eWriter三种:context:TaSkContext,metrics:):K,va1mapTaskTds=taskTdMapsForShuff1e.ComputeTfAbsentnewByPaSSMergeSorIShUffIewrit_er(caseother:Base

7、ShuffIeI1anc11eKunchecked,VuncheckecnewSortShuff1eWriteNother,map1d,context,ShUff1eEXeCUtorCOnIPonentS)而具体的Shuff1e1Iand1e的选择是根据shuff1e算子实际的partition数、是否需要执行排序或者聚合等情况来确定的:dependency:K):Iif(SortShuff1eWriter.Shou1dBypassMergeSort(conf,C1ePenCIenCy)newBypassMergeSortShuff1eHand1eK,V(Shuff1eTd,dependenc

8、y,asInstancefShuff1eDependencyK,Ie1seif(SOrtShUffIeManager.CanUseSeria1iZedShuff1e(dependenIShUff1eId,dependency,as1nstancefShuff1eDependencyK,r-1.1.,1,ii.(shu1,f1e1c1,doD(?nden(r)下面分别介绍这三种Shuff1eHand1e的选择逻辑:1、BypassMergeSortShuff1e11and1eBypassMergeSortShuff1eHand1e对应BypassMergeSortShuff1eWriter,当不

9、需要做map端的聚合,并且分区数小于SHUFF1E_SORTBYPASSJfERGETHRESHO1D(默认200)时采用这种方式,可以跳过在内存中排序和聚合的过程:dep.partitioner.numPartitionsaggregateByKey.CombineByKey等mapSideCombine属性为true;注意执行groupByKey算子时该属性也为fa1se:efgroupByKey(partitioner:Partitioner):RDD(K,IterabIeY)ICreateCOnIbiner,mergeVa1ue,mergeCombiners,ParIitioner,3

10、)shuff1edependency的partition数小于MAX_SHUFF1E_OUTPUT_PARTITIONS_FOR_SERIA1IZEDJfODE,即16777215o3、如果无法使用前两种ShUffIeHandie,则采用BaSeShUff1eHandie,对应Shuff1eWriter为SortShuff1eWritero综合以上分析,Spark根据Shuff1eHand1e的不同而选择相应的Shuff1eWriter的实现,接下来我们来详细阐述这三种ShUffIeWriter中最为典型的实现方式SortShuff1eWriter的执行原理,在后续的Spark内存管理的文章中我们将对UnsafeShuff1eWriter以内存的角度进行阐述;而BypassMergeSortShuff1eWriter则是SortShuff1eWriter的特殊情况,即跳过了map排序和聚合部分。SortShuff1eWriterSortShuff1eWriter通过insertA11方法首先将参与shuff1e的数据写入到ShUff1e缓存列表中,当缓存列表的空间增大到无法继续写入时则将数据溢写到磁盘中。Shuff1e缓存列表的实现有两种数据结构:如果参与sh

展开阅读全文
相关资源
猜你喜欢
相关搜索

当前位置:首页 > 应用文档 > 汇报材料

copyright@ 2008-2022 001doc.com网站版权所有   

经营许可证编号:宁ICP备2022001085号

本站为文档C2C交易模式,即用户上传的文档直接被用户下载,本站只是中间服务平台,本站所有文档下载所得的收益归上传人(含作者)所有,必要时第一文库网拥有上传用户文档的转载和下载权。第一文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对上载内容本身不做任何修改或编辑。若文档所含内容侵犯了您的版权或隐私,请立即通知第一文库网,我们立即给予删除!



客服