《Spark Shuffle原理及源码解析.docx》由会员分享,可在线阅读,更多相关《Spark Shuffle原理及源码解析.docx(32页珍藏版)》请在第一文库网上搜索。
1、SparkShuff1e原理及源码解析Shuff1e的产生在Spark作业中当父RDD与子RDD的分区对应关系为多对多或者一对多的情况下会发生宽依赖,也即一个父RDD的分区需要分发到多个子RDD所在的任务中去执行,这种情况就会涉及数据的重新分布,也即产生了ShUffIe。Spark算子是否引入ShUff1e与各算子的具体实现有关,本质上是要看父子RDD的分区器的实现是否相同,例如:在执行聚合类算子reduceByKey时判断是否会引入shuff1e,需要分析父子rdd的分区器partitioner是否一致,如果不一致则创建一个ShUffIeRDD作为子RDD从而产生shuff1e:CrOatC
2、Combinor:二(,BiergeCombiners:(C,C)=C,seria1izer:SeriaiiZer二nu11)(imp1icitct:C1aSSTa1I)*:、)M二二,!i()*)h()IJ;.:一,IInewInterrUPtib1e1terator(Context,aggregator,combiI:1.,,:,-:I,Idr1,!:,-:,;:I.I7MUid:工1;O;,:1II11.,:111类似的再以join算子的实现为例,在CoGroupedRDD的getDependencies方法中遍历所有的父rdd,如果父rdd和子rdd的分区器一致则创建OneToOneD
3、ependency,否则创建Shuff1eDependency并引入shuff1e:MerridedefgetDependencies:SeqDependency_=rdds.maprdd:RDD=IiJ.:小:IogDebUg(AddingShUffIedependencyWith+rdd),newShUffIeDePendCneyK,Any,COGrOu1)CoInbinCrrdd.asInstanceOfRDD_:Product2K,_,part,ShUff1e文件的生成我们首先来看ShUffIe的第一个阶段:ShUffIeWrite阶段。通过SPark任务调度原理的讲解,我们知道在创建
4、taskset的过程中,如果当前的Stage是Shuff1eMapStage,则创建的任务类型为Shuff1eMapTask,否则task的类型为Resu1tTask,两种类型的TaSk均实现了runTask方法;如果发生了ShUffIe则执行Shuff1eMapTask实现的runTask方法,即根据rdd、dependencymapId等信息调用Shuff1ewriteProcessor的write方法执行shuff1e数据的写入:Va1rdd=rddAndDep.Va1dep=rddAndDep.N/WhiIeWeUSetheO1dShUff1efetchPrOtoCo1邛eUSePaI
5、deP.Shuff1eWriterProcessor,write(rdd,dep,map1d,context,Partit选择Shuff1eWriter在Shuff1ewriteProcessor的write方法中首先通过Shuff1eManager获取writer实例,然后再由相应的writer执行具体的write逻辑:IefWritedep:Shuff1eDependencyI1,_,pur1i()n:,;.):,:,【Varwriter:ShUffIeWriterAny,Any二nu11,Va1Inanager=SparkEnv.get.Shuff1eManageijmmjj1,;:,r
6、:、:rdd.i1era1or(par1i1ion,COnteXI).as1ns1ancefI1era1orSpark根据Shuff1eHand1e的不同采用相应的Shuff1eWriter的实现,包括:UnsafeShuff1eWriter.BypassMergeSortShuff1eWriter和SortShuff1eWriter三种:context:TaSkContext,metrics:):K,va1mapTaskTds=taskTdMapsForShuff1e.ComputeTfAbsentnewByPaSSMergeSorIShUffIewrit_er(caseother:Base
7、ShuffIeI1anc11eKunchecked,VuncheckecnewSortShuff1eWriteNother,map1d,context,ShUff1eEXeCUtorCOnIPonentS)而具体的Shuff1e1Iand1e的选择是根据shuff1e算子实际的partition数、是否需要执行排序或者聚合等情况来确定的:dependency:K):Iif(SortShuff1eWriter.Shou1dBypassMergeSort(conf,C1ePenCIenCy)newBypassMergeSortShuff1eHand1eK,V(Shuff1eTd,dependenc
8、y,asInstancefShuff1eDependencyK,Ie1seif(SOrtShUffIeManager.CanUseSeria1iZedShuff1e(dependenIShUff1eId,dependency,as1nstancefShuff1eDependencyK,r-1.1.,1,ii.(shu1,f1e1c1,doD(?nden(r)下面分别介绍这三种Shuff1eHand1e的选择逻辑:1、BypassMergeSortShuff1e11and1eBypassMergeSortShuff1eHand1e对应BypassMergeSortShuff1eWriter,当不
9、需要做map端的聚合,并且分区数小于SHUFF1E_SORTBYPASSJfERGETHRESHO1D(默认200)时采用这种方式,可以跳过在内存中排序和聚合的过程:dep.partitioner.numPartitionsaggregateByKey.CombineByKey等mapSideCombine属性为true;注意执行groupByKey算子时该属性也为fa1se:efgroupByKey(partitioner:Partitioner):RDD(K,IterabIeY)ICreateCOnIbiner,mergeVa1ue,mergeCombiners,ParIitioner,3
10、)shuff1edependency的partition数小于MAX_SHUFF1E_OUTPUT_PARTITIONS_FOR_SERIA1IZEDJfODE,即16777215o3、如果无法使用前两种ShUffIeHandie,则采用BaSeShUff1eHandie,对应Shuff1eWriter为SortShuff1eWritero综合以上分析,Spark根据Shuff1eHand1e的不同而选择相应的Shuff1eWriter的实现,接下来我们来详细阐述这三种ShUffIeWriter中最为典型的实现方式SortShuff1eWriter的执行原理,在后续的Spark内存管理的文章中我们将对UnsafeShuff1eWriter以内存的角度进行阐述;而BypassMergeSortShuff1eWriter则是SortShuff1eWriter的特殊情况,即跳过了map排序和聚合部分。SortShuff1eWriterSortShuff1eWriter通过insertA11方法首先将参与shuff1e的数据写入到ShUff1e缓存列表中,当缓存列表的空间增大到无法继续写入时则将数据溢写到磁盘中。Shuff1e缓存列表的实现有两种数据结构:如果参与sh