《Spark任务调度源码解析.docx》由会员分享,可在线阅读,更多相关《Spark任务调度源码解析.docx(20页珍藏版)》请在第一文库网上搜索。
1、Spark任务调度源码解析SPark作业通过资源调度系统获取了计算资源(请参考前文Spark作业资源调度源码解析),然后即开始调度计算任务来执行实际的数据处理(比如ET1,机器学习、图计算),本文继续来解析SPark任务调度的相关处理过程和原理。Action触发任务调度Spark是惰性计算模式,所有的transformation算子的实际执行都是通过action算子来触发的;action算子是划分job的分界,因此本文的任务调度以job为单位来解析Spark作业任务调度的实现。RDD的action算子(例如:co11ect、count、take、foreachPartition等)都会调用Sp
2、arkContext的runJob方法来提交一个job,以co11ect算子为例,该算子的实现如下:efco1Iect():ArrayT=WithSCoPeva1resu1ts=sc.runJob(this,(iter:IteratorT)=iter.Array.concat(resu1ts:*)在runJob方法中实际是调用了DAGSChedU1er的SubmitJob方法,然后通过eventProcess1oop对象将作业提交的事件放到了事件队列eventQueue中:Ijob1d,rdd,func2,PartitionS.toArray,Ca11Site,waiter,eventProc
3、ess1oop是在DAGSchedu1er初始化时创建的事件调度对象,该对象启动了一个线程不断从eventQueue中获取事件来处理:j1:Va1event=eventQueue.take()OnReceive(event)拿到JobSubmitted的事件后由Event1oop的子类DAGSchedu1erEventProcess1oop调用Hand1eJobSubmitted方法来进行处理,该方法的处理逻辑主要分为两步,第一步是划分Stage构建DAG有向无环图,第二步则是发起任务调度,本文的主要内容也将围绕这两点来展开:schedu1erJh,nd1cJ()I)Sub1ni1ted(jo
4、bI(1:IVarfina1Stage:ReSU11Stage二fina1Stage=CreateResu1tStage(fina1RDD,func,partitionssubmitStage(fina1Stage)划分StageDAG是一个有向无环图,SPark根据各RDD的数据来源、算子以及分区器等要素构建出不同类型的RDD对象(例如HadOOPRDD、Shuff1edRDD)、子RDD与父RDD之间的依赖关系集(OneToOneDependencyShuff1eDependencyRangeDependency)以及RDD分片的数据本地性属性等,进而构建出整个作业的DAGo由于SPark
5、的Stage是以ShUffIe也即宽依赖为边界进行划分的,有了DAG接下来就可以根据RDD之间的依赖类型来划分stage了o首先从resu1trdd开始向前回溯不断获取当前rdd的所有宽依赖列表,如果遍历到的依赖是宽依赖则放入ParentSHashSet中,如果遍历到的是窄依赖则继续遍历该窄依赖的rdd的所有dependency,直到找到下一个宽依赖并放到parents列表中或者遍历了其所有父rdd无法找父rdd为止,得到rdd所有的shuff1e依赖:rdd:RDD_):(HashSetShuff1eDependency1,VaIParentS二newHaShSetShuffIeDepend
6、ency_-I.resourceProfiIes二.I1UShSRcs()ui、(3)m门!c,.Va1WaitingFOrViSit=new1iStBUfferRDDWaitingFOrViSit+=rddWhiie(WaitingForVisiXnonEmpty)VaItoVisit二WaitingForViSit.remove(O)if(!visited(toVisit)visited+=toVisitOP1iOn(IOViSiI.ge1ResourceProfi1e).foreach(resourceProfi1es+=_)CaSeShUffIeDep:Shuff1eDependenc
7、y1,WaitingForVisit.PrePend(dependency.rdd)(parents,resourceProfi1es)然后根据该rdd的Shuff1eDeps列表中的每个shuff1e依赖创建出相应的父stage:rivatedefgetOrCreateParentStages(shuffIeDeps:I1ashSetShuff1eD)endency_,firstJob1d:Int):1iStStage=:I.1:IgetOrCreateShuff1eMapStage(shuffIeDep,firstJob1d)其中getOrCreateShuff1eMapStage方法采用
8、递归的逻辑,如果一个shuff1e依赖所属的rdd有未遍历到的shuff1e依赖,则创建该依赖所属的stage;否则,继续遍历该rdd的所有ShUffIe依赖,并创建各个ShUffIe依赖所对应的父stage:IgetMiSSingAnCeStorShUffIeDePendenCieS(ShUffIeDep.rdd).foreachdep在CreateShuff1eMapStage方法中可以看到stage的id是一个顺序递增的值id=nextStage1d.getAndIncrement(),由于是递归创建Stage因此越往前遍历stage的id越小,新创建的stage对象中的属性主要包括该s
9、tage的id、最后边的那个rdd及其分区数(决定了map任务数)、该Stage所有父Stage的集合、shuff1eCIePenenCy等:a1ParentS=getOrCrentsParentStages(ShUffIeDePS,job1d)I1.,,ijJII;:a1Stage二newIid,rdd,numTasks,parents,job1d,rdd.creationSite,ShUff1II1Ii【I,-当创建出了所有的Shuff1eMapStage之后则会创建Resu1tStage:a1Stage二newReSU1tStage(id,rdd,func,PartitiOns,Pare
10、ntI.!.17一,U,1,,K,,一:到目前为止完成了hand1eJobSubmitted方法中的fina1Stage=CreateResu1tStage(fina1RDD,func,partitions,job1d,ca1ISite)的逻辑,该job的DAG也完成了构建,接下来通过调用SUbnIitStage(fina1Stage)方法开始作业所有stage及其任务的调度过程。启动stage调度SubmitStage方法也是一个递归,由于job中的stage已经划分完成并且从前到后进行了编码,在本方法的逻辑中如果当前Stage的父Stage列表不为空,则调用其所有父stage的Submit
11、Stage方法;如果当前的stage的父stage为空则调用SubmitMissingTasks方法启动该stage中任务调度逻辑:privatedefSUbinitStage(Stage:Stage):Unit二IifQwaitingStages(Stage)&!runningStages(stage)IVaImissing二getMissingParentStages(stage).sortByII:II:,:一,I;!.,.:I;启动了Stage调度并没有真正地将任务分发到executor上去运行,接下来还需要创建任务集并根据特定的调度策略制定调度计划之后再执行任务的调度以及分发执行。S
12、tage之间的调度策略根据用户的配置来确定,包括FIFO和FA1R两种(默认是FIF0),并且在TaSkSChedUIer初始化时就根据作业的配置创建了不同stage调度器和调度队列:I.I:,I,,,,:I(r()()i1%si,IthrownewI1Iega1ArgUmentEXCePtion(SUnsupported$|Schedu1ab1eBui1der.bui1dPoo1s()创建TaskSets介绍了Stage的调度策略,接下来我们来看一下任务集创建的流程:1.获取该stage中各个数据分片的数据本地性,从下面的代码中可见生成的结果集taskIdT010cations是一个key为
13、分片id,va1ue为本地性列表的HashMap:va1task1dTo1ocations:MapInt,SeqTask1ocation=tryICaSes:ShUft1eMaPStageIPartitionSTOComPUte.mapid=(id,getPreferred1ocICaSes:ReSU1tStage二.1:1,(11Ir-.,其中分片的本地性的计算方法主要通过getPreferrec11ocsInterna1方法实现,如果RDD已被缓存则返回所有分片被缓存的TaSk1OCation信息并返回,如果RDD的分片本身具有本地性属性则返回其本地性的TaSk1oCatiOn信息并返回,
14、如果不满足以上两种情况则不断获取该RDD窄依赖对应的父RDD调用getPreferrec11ocsInterna1方法继续计算,如果最后没有获得任何本地性信息则该RDD不具有本定性:,.getP2jij2c11s1ntcHHMmBBMvisited:HashSet(ROD_,Int):SVaICaChed=getCache1ocs(rdd)(PartitiOn)(.Ni12.将待处理的task封装为一个TaSkSet对象,该对象中携带了Stage1d、任务的代码闭包(包括rdd、Stage的ShUffIe依赖关系或者action算子信息)、数据分片、分片的数据本地性信息等:Va1tasks:SeqTask_=tryVaISeria1izedTaskMetrics=C1osureSeria1izer.SeriaiiZe(Stage.IateSt1nfo.taskMetric