Spark的运行流程详解_Lyy1016的博客-程序员信息网

技术标签: ui  大数据  

一、Spark提交应用任务的四个阶段:

 

总共提交的任务分为四个阶段,提交+执行:

 

1、在分配完毕executor以后,解析代码生成DAG有向无环图;

 

2、将生成的DAG图提交给DAGScheduler,这个组件在driver内,DAGScheduler负责切分阶段,按照DAG图中的shuffle算子进行stage阶段的切分,切分完毕阶段以后,按照每个阶段分别生成对应task任务的集合,将每个阶段所有的task任务放入到对应的set集合中,提交这个set集合,即一次性提交每个stage阶段的所有任务(每个阶段准备好就提交哪个阶段)

 

3、将任务的集合提交给taskScheduler【driver端组件】,这个组件会将数据通过集群管理器提交给集群(executor),对任务进行监控,分配资源,负责提交,负责执行,负责故障重试,负责落后任务的重启

 

4、真正提交到executor端,在executor中进行执行,保存执行过后的数据,或者存储数据

 

 

二、Spark的运行流程详解【重点】:

spark-submit 提交命令的解析:

 

 

 

通过查看底层,我们可以了解到,使用spark-submit方法提交任务的时候的时候,实际上是运行的 Spark内的 SparkSubmit 类

提交任务的命令:

spark-submit --master xxx --class xxx --name xxx xxx.jar input   output

  我们使用命令提交任务的时候,设置的参数实际上就是给SparkSubmit 的参数;

A、查看SparkSubmit的源码:spark-submit 提交任务时,实际上是运行sparksubmit中的main方法

 

 

spark-submit 提交任务时,实际上是运行sparksubmit中的main方法:

所以--master xxx --class com.bw.spark.wordcount --name xxx xxx.jar input output 这些东西都是main方法的参数

 

main方法中会接收传入的参数,将传入的参数解析后,使用匹配模式,判断输入的参数并执行不同的操作;

 

 

 

1、submit 提交一个应用任务:

spark-submit --master spark://master:7077 --class com.bw.spark.wordcount --name xxx xxx.jar input output

 

 

A、可以看到在 submit 方法内解析出了四个参数:①

a)      childArgs == 子类的参数列表,是提交的参数列表内的 输入参数【input】 和 输出路径 【output】 ,数据用于要运行的jar包内的main方法

b)     childClasspath == 子类的类的路径,是 --class 的参数【com.bw.spark】 ,数据用于SparkSubmit 类中的main方法

c)      sysProps == 系统属性,即 --master 的参数,指定集群模式【spark://master:7077】,数据用于SparkSubmit 类中的main方法

d)     childMainClass == --class 参数中类内的入口(main方法)【wordcount 】,数据用于SparkSubmit 类中的main方法

由SparkSubmit类中的main方法管理并启动自定义的类中的main方法

 

B、通过 runMain 方法使用解析出来的参数继续向下执行②

 

 

 

C、通过反射机制,将mainClass的字符串转换成一个主类

 

 

 

D、根据主类找到这个类中的main方法

 

 

 

E、通过反射机制执行main方法,将传递近来的参数放置到main方法中执行

 

 

 

总结:其实任务的提交就是运行main方法,解析代码解析main方法,解析到此进入等待状态 

 

2、开始初始化driver端的东西,初始化上下文 ①sparkContext ②DAGScheduler ③TaskScheduler【属于预提交的阶段】

A、 SparkContext中需要初始化的组件:

 

 

B、 在提交任务的时候就要初始化的重要组件:

  1. DAGScheduler 【划分DAG并按阶段提交】
  2. TaskScheduler  【任务的向集群提交,监控执行】
  3. SchedulerBackEnd 【提交任务的通信组件】

 

1--2、根据部署的集群模式不一样。创建不同的DAGScheduler和TaskScheduler

 

 

 

3、根据部署模式不一样创建的SchedulerBackEnd也不一样,根据资源分配不同的核数:

本地模式时,根据设置的线程数创建不同的SchedulerBackEnd;

集群模式时,不需要设置,线程数由集群管理;

 

 

 

 

 

3、组件的创建实例完毕以后,开始解析代码:

driver初始化完成以后开始解析代码,(executors已经启动),记录textFile从什么位置开始读取数据,记录每个算子生成rdd的数量,分区个数,逻辑,各个rdd之间的血缘关系,只有遇见真正的action算子才开始执行,会生成DAG有向无环图,rdd就是点,算子就是线;

 

A、开始将DAG有向无环图提交给DAGScheduler进行阶段的切分:

从saveASTextFile开始进入,找到最后一步,可以发现是将任务提交给DAGScheduler,进行任务阶段的切分:

 

 

 

 

 

 

 

 

 

 

 

 

 

B、到DAGScheduler中进行任务的切分阶段,将每一个准备好的阶段提交给TaskScheduler

 

Ⅰ、在DAGScheduler中的doOnReceive方法接收传进DAGScheduler的任务,进行任务的处理

 

 

 

Ⅱ、DAGScheduler中负责将任务进行拆分,按照shuffle算子进行拆分不同的stage

 

❶ 找到最后一个RDD,创建一个resultStage:

 

 

❷ 根据最后一个resultStage,找到这个result的父stage,如果父stage为空,那么表明已经到了最开始的stage,直接进行提交

如果父stage不为空,那么继续调用自身以递归的形式进行倒序的向前排查,直到找到初始stage为止;

 

 

 

❸ Stage的划分:通过最后一个rdd向前推,如果这个RDD是宽依赖就将stage+1,如果是窄依赖就将当前stage阶段中的rdd+1;当每个阶段都推衍完毕以后,将每个阶段中的所有的task组成一个taskSet。

 

 

 

❹将每个阶段中的所有的task组成一个taskSet集合:

每个阶段匹配一下,如果是shuffleMapStage就组装一个集合,这个集合中装入的都是shuffleMapTask;如果是resultStage那么这个stage中装入的就是resultTask;

 

 

 

 

C、将任务集合提交给TaskScheduler,TaskScheduler进行任务的提交到集群中,然后执行操作,负责监控,申请资源,故障重试

 

 

 

 

 TaskScheduler是一个接口:

 

 

 

TaskScheduler的其中一个实现类【TaskSchedulerImpl】:

实现类中存在各项组件方法,实现对任务进行各项初始化与管理,在配置好任务之后提交给Executor 执行;

 

 

ctrl+alt+b找到TaskScheduler接口的实现类【TaskSchedulerImpl】:

在taskSchedulerImpl中通过submitTasks方法将任务提交SchedulerBackEnd组件进行提交任务:

 //提交任务的一个组件:
 override def submitTasks(taskSet: TaskSet) {

  //取出集合内所有的Task任务
  val tasks = taskSet.tasks

  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")

  this.synchronized {
    
    //创建一个Stage管理器,对不同的stage阶段的task任务集合进行管理
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    
    //得到当前处理的stage是第几个阶段的
    val stage = taskSet.stageId

    val stageTaskSets =

      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])

    stageTaskSets(taskSet.stageAttemptId) = manager

    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>

      ts.taskSet != taskSet && !ts.isZombie

    }

    if (conflictingTaskSet) {

      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +

        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")

    }

    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

  

    if (!isLocal && !hasReceivedTask) {

      starvationTimer.scheduleAtFixedRate(new TimerTask() {

        override def run() {

          if (!hasLaunchedTask) {

            logWarning("Initial job has not accepted any resources; " +

              "check your cluster UI to ensure that workers are registered " +

              "and have sufficient resources")

          } else {

            this.cancel()

          }

        }

      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)

    }

    hasReceivedTask = true

  }
  
  //将所有的task取出之后,使用 SchedulerBackEnd 通信方式将任务提交给executor执行
  backend.reviveOffers()

}
 

  

 

D、SchedulerBackEnd 通信组件:接收TaskScheduler 发送的任务,将任务转发给 executor

SchedulerBackEnd 是一个接口,这个接口存在两个实现类,一个是本地通信的实现类,一个是负责集群通信的实现类

 

 

 

集群模式的通信方式:

1、接收TaskScheduler 发送的任务

2、遍历将接收到的TaskScheduler 任务进行序列化以用于传输

3、通过rpc协议进行任务传输,到executor端

 

 

 

4、运行任务:【executor开始执行任务】

A、 executor的本质就是线程池

 

 

 B、 Task类包装为多线程类:

executor 执行器就是一个线程池,但是这个线程池中能够执行的只有多线程的类,而task任务不是多线程的,所以用一个taskRunner多线程的类对task进行包装后,Task就成为了多线程的,可以放入到线程池中运行!!!!

 

C、Task在executor中的运行流程:

在运行任务的时候调用taskRunner中的run方法,先进行任务的反序列化,计算时间,以及分配资源,然后交给执行器进行执行,将执行完毕的任务从Taskset中去除

 

 

D、执行器【runTask方法】:任务真正运行的地方

 

task 分为 shuffleMapTask 和 resultTask 两种,都在task中调用runTask方法执行:

 

 

5、任务管理:

driver端是所有应用的老大,他会管理每一个executor中的任务执行;监听,数据管理,任务重启。。。。 TaskScheduler

所有的组件,以及代码,变量等数据都在driver端,只有运行的时候会被传送至executor端,因此在driver中运行的程序+变量,都需要被实例化

 

三、spark任务的四大调度

1、application 

spark-submit spark-shell提交的任务就是一个应用,会生成一个application

2、job

遇见一个action算子就会生成一个job 

3、stage 

遇见shuffle就会切分stage,  stage 数量 = shuffle算子数+1

4、task

运行任务的最小单位,一个stage中最后一个rdd的分区数量就是这个stage中task任务的个数

 

四、几个重要的数值:

  1. 读取外部文件的时候,rdd的分区数量是,这个被读取的文件存在多少个block快就有多少个分区;但是当文件只有一个分区时,产生两个分区;
  2. 每个stage中task的个数取决于最后一个rdd的分区数量
  3. 写入到hdfs中的文件个数(saveAsTextFile),是存储的rdd的分区数量
  4. 一个job每个能够运行多少个task任务?这个job区段内每个stage中的最后一个rdd的分区的总和
  5. 同时并行能够运行多少task任务?集群中总核数,一个线程对应一个Task任务,如果任务数量比总的核数多,则等待执行
  6. 带有shuffle的算子切分stage,产生的依赖是宽依赖;判断是否是宽依赖的最简单依据就是看算子是否会产生shuffle;
  7. 不带shuffle的算子,都在一个stage内,产生依赖是窄依赖
  8. 在整个应用任务流程中,action行动类算子会产生job任务使懒加载开始执行;shuffle算子切割stage,产生不同的pipeline管道形式的stage提交阶段;两者不是对等关系,job 的阶段  ≠  stage的阶段;

 

转载于:https://www.cnblogs.com/Komorebi-john/p/11263945.html

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Lyy1016/article/details/101923784

智能推荐

2021-07-07mathtype宏相关问题解决方案_单线程生物在哪儿的博客-程序员信息网_mathtype禁用宏

这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Markdown编辑器你好! 这是你第一次使用 Markdown编辑器 所展示的欢迎页。如果你想学习如何使用Mar

通过post向指定URL地址访问爬取数据_未来的农场主的博客-程序员信息网_通过post访问url

一:直接上代码,控制层//这是下面要用的HOST,里面的数据是F12在页面看到的,例如:127.0.0.1:8088private final static String HOST = PropertiesUtil.getDocking("url:端口号");@RequestMapping("findPackageDetails") public void findPackageDetails() { // 根据固定地址获取cookieString param ="U.

[享学Feign] 九、Feign + OkHttp和Feign + Apache HttpClient哪个更香?_方向盘(YourBatman)的博客-程序员信息网

前八篇文章介绍完了feign-core核心内容,从本篇开始将介绍它的“其它模块”。其实核心模块可以独立的work,但是不免它的能力偏弱,比如只能编码字符串类型、只能解码字符串类型,默认使用`java.net.HttpURLConnection`作为HC...本篇将介绍它的第一个模块:Client相关模块。我们知道,流行的开源Http库的性能均远高于JDK源生的`HttpURLConnection`,因此实际生产中**肯定是**用的三方库来发送Http请求。Feign它提供了`feign.Client

JS 作用域和作用域链_destinytaoer的博客-程序员信息网_js作用域和作用域链

1. 作用域作用域就是代码的执行环境,全局执行环境就是全局作用域,函数的执行环境就是私有作用域,它们都是栈内存。执行环境定义了变量或函数有权访问的其他数据,决定了它们各自的行为。每个执行环境都有一个与之关联的变量对象,环境中定义的所有变量和函数都保存在这个对象中。虽然我们编写的代码无法访问这个对象,但解析器在处理数据时会在后台使用它。全局执行环境是最外围的一个执行环境。根据 ECMAS...

构建安全的dDoS(拒绝分布式DoS攻击) dns服务_iteye_10303的博客-程序员信息网

近日发现DNS bind9.9.1 存在严重的安全漏洞 .tail -f dns.log 发现大量不同网段ip 请求whbl.com 域名 .打开该网站看了看是个国外新闻网站. 同过日志分析发现攻击者使用了DoS攻击,采用了大量僵死云向我们服务器发起dns请求,请求频率超过2次/秒. 网上查了,有的出来个补丁但是都是针对9.3.2以下版本的.   这些请求占用了带宽.在9.9...

linux系统改装win10教程,技术编辑为你解答win10系统安装Linux Mint的图文方案_苏慕凉的博客-程序员信息网

win10系统稳定性好,使用者众多;免不了会遇到win10系统安装Linux Mint这样的问题要处理,大部分伙计都是第一次看到win10系统安装Linux Mint这样的事情,有的朋友想试着解决一下win10系统安装Linux Mint的问题却不知怎样下手,其实解决方法很简单,只需采取以下两步措施:1、你需要安装新的操作系统之前做明显的事情。第一个也是最重要的一点是要备份你坐在你的计算机上的任何...

随便推点

nRF51822_nicole088的博客-程序员信息网_nrf51822

System OFF modesystem off mode是最深省电模式。 在此模式下,系统的核心功能已关闭,所有正在进行的任务都将终止。 唯一有效的机制在这种模式下响应是复位和唤醒机制。在系统关闭模式下保留一个或多个RAM块可以通过以下方式进行唤醒:1、GPIO信号2、LPCOMP模块3、复位中唤醒在进入系统关闭模式之前,用户必须确保所有正在进行的EasyDMA交易已完成。 这通...

SpringBoot 使用JDBC_風栖祈鸢的博客-程序员信息网_springboot使用jdbc

SpringBoot 使用JDBC我超,又要开始了。太久不写,全忘完了,就当 Remake 了!1. 使用JDBCSringBoot 对数据层进行访问,无论是 SQL(关系型数据库)还是 NoSQL(非关系型数据库),其底层都是采用 Spring Data 的方式进行处理的。按照之前的笔记,创建 SpringBoot-05-Data 项目,然后添加数据相关的依赖,先只添加基础的 JDBC API 和 MySQL Driver,学习一下。 进入项目,查看 pom.xml 文件,可以看到

micropython 中断_[Micropython]TPYBoardV10X教程6 按键开关,回调函数和中断_weixin_39887546的博客-程序员信息网

原创版权归山东萝卜科技有限公司所有,转载必须以链接形式注明作者和原始出处。tpyboard 开发板上有两个小按键,分别标示为 USR 和 RTS。RTS 按键属于复位按键,如果按下的话将重新擦写重启开发板,相当于将开发板断电再重启。USR按键供用户使用,且其可以通过声明一个按键对象(Switch object)进行控制。创建开关对象的方法如下:>>>sw=pyb.Switc...

网络组Network Teaming_weixin_30555753的博客-程序员信息网

网络组: 将多个网卡聚合在一起,从而实现冗错和提高吞吐量 网络组不同于旧版中bonding技术,提供更好的性能和扩展性 网络组由内核驱动和teamd守护进程实现.有以下不同方式:runner1.roundrobin 【mode 0】轮转策略 (balance-rr)特点:1)从头到尾顺序的在每一个slave接口上面发送数据包,轮询方式往...

51Nod 1191 消灭兔子 (贪心+优先队列)_shiyicode的博客-程序员信息网

题目链接:消灭兔子题目大意 n个兔子,每个兔子都有一个血量b[i] m种箭(每种各一支),每种箭都有伤害值d[i]和价格p[i] 每个兔子只能被射一次,伤害值大于血量则死,每种箭只能用一次 问杀死所有兔子需要的最小价格为多少,若不能杀死,则No Solution m,n小于50000)思路 典型的贪心,每个兔子只能射一次,所以只能用伤害值大于其血量的箭,在此前提下,

2018秋寒假作业3—秋季学期学习总结_aobi5342的博客-程序员信息网

希望通过下学期的学习能够增强自己的自主编程能力。转载于:https://www.cnblogs.com/jk-liulei/p/10396854.html