原创

死磕spark中的job、stage、task

写在前面

台风夜的电话面试里被问到了spark运行任务的过程中stage的划分依据。一下子就给整懵了,支支吾吾答非所问。从事大数据的开发也有一年半光景,spark任务的运行原理依旧知之甚少。因此就参阅各种优秀的文章,再配上一个自己工作中的实际项目,特意整理出这篇笔记,以此警示自己的自大与无知。

测试环境

本地开发环境

  • idea 2019.1.2
  • maven 3.6
  • spark 2.4.3
  • scala 2.1.8
  • jdk1.8

测试集群环境

  • spark 2.4.3
  • scala 2.1.8
  • jdk1.8
  • hadoop2.7.4

集群

测试项目例子

计算某一天店铺销售额\时段销售额top10

样例数据字段格式

filed1|filed2|filed3|store_no|filed5|filed6|filed7|filed8|amount|filed10|filed11|sale_time

这里不提供具体的测试数据,实验过程中需要自己模拟所用的数据。

样例demo

package com.dr.leo

import com.dr.leo.utils.StrUtils
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession

/**
  * @author leo.jie (weixiao.me@aliyun.com)
  * @organization DataReal
  * @version 1.0
  * @website https://www.jlpyyf.com
  * @date 2019-07-28 20:29
  * @since 1.0
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("WordCount")
      //.master("local[*]")
      .enableHiveSupport()
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    val fileRddOri = loadFileToRdd(sc, "hdfs://leo/test/pos.DAT")
    val fileRdd = fileRddOri.map(x => for (data <- x._2.split("\\|")) yield if (data == null) "" else data.trim)
      .filter(x => x.length == 12)
      .map(x => (retailer_shop_code(x(3)), x(10).substring(10, 13), x(8).toFloat))
      .map(x => ((x._1, x._2), x._3))
      .reduceByKey(_ + _)

    fileRdd.top(10)(Ordering.by(e => e._2)).foreach(println(_))
    println("##########################################################")
    fileRdd.map(x => (x._1._1, x._2)).reduceByKey(_ + _).top(10)(Ordering.by(e => e._2)).foreach(println(_))
    println("##########################################################")
    println(fileRdd.count())
    println("##########################################################")
    println(fileRdd.first())
    println("##########################################################")
    fileRdd.take(10).foreach(println(_))
    while (true) {
      ;
    }
    spark.stop()
  }

  /**
    * 读取gbk编码的file
    * @param sc
    * @param path
    * @param encoding
    * @return
    */
  def loadFileToRdd(sc: SparkContext, path: String, encoding: String = "GBK"): RDD[(String, String, Int)] = {
    sc.hadoopFile[LongWritable, Text, TextInputFormat](path)
      .asInstanceOf[HadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
        val file = inputSplit.asInstanceOf[FileSplit]
        iterator.filter(x => x._2 != null).map(x => {
          (file.getPath.getName, new String(x._2.getBytes, 0, x._2.getLength, encoding), 1)
        })
      })
  }

  /**
    * 只是一个店铺号转换的函数
    * @param retailer_shop_code
    * @return
    */
  def retailer_shop_code(retailer_shop_code: String): String = {
    if (StrUtils.isBlank(retailer_shop_code)) ""
    else if (retailer_shop_code.length == 5) retailer_shop_code.substring(0, retailer_shop_code.length - 1).toUpperCase()
    else if (retailer_shop_code.length == 6) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
    else if (retailer_shop_code.length == 8) retailer_shop_code.substring(0, retailer_shop_code.length - 2).toUpperCase()
    else retailer_shop_code
  }
}

运行测试

程序打包后发往集群提交任务。所用命令

[hadoop@node1 leo_demo]$ spark-submit --master yarn --deploy-mode cluster --driver-memory 2G --driver-cores 2 --executor-memory 2g --num-executors 5 --executor-cores 2 --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.network.timeout=30000 --class com.dr.leo.WordCount leo-study-spark.jar

spark-ui上的信息告诉了我们什么?

查看任务的运行信息

spark任务运行的过程中,我们可以点击 ApplicationMaster 跳转任务运行的界面。

yarn

运行流程之:job

job

此时我们提交的任务的所有job都已经运行成功,只因为程序中任务执行完毕后是一段无限循环,所以这个界面会一直存在,直到我们手动在yarn上kill掉这个application。

我们写的代码被提交运行的过程中,会先被划分为一个又一个job,这些job按照被划分的先后顺序会依次执行。

图示中我们已经知道,我们提交的任务,最终被划分成了5个job。


所谓一个job,就是由一个rdd action触发的动作。简单理解为,当你需要执行一个rdd的action操作的时候,就会生成一个job。

这里不会赘述什么是rdd的action操作。

结合代码与图示我们可以知道:

job-0 的产生是由于触发了top操作

top at WordCount.scala:33

job-1 的产生是由于触发了top操作

top at WordCount.scala:35

job-2 的产生是由于触发了count操作

count at WordCount.scala:37

job-3 的产生是由于触发了first操作

first at WordCount.scala:39

job-4 的产生是由于触发了take操作

take at WordCount.scala:41

运行流程之:stage

选择任意一个job,点击链接去查看该job的detail,这里我们选择job-0。

job-detail

由图示我们可知,job-0由两个stage组成,并且每个stage都有8个task,说明每个stage的数据都在8个partition上。

下面我们将详细说明stage以及stage的划分依据。

stage 概念、划分

貌似没有十分明确的概念来十分清楚地说明spark stage究竟是什么?这里只记录从众多优秀的博客里提取的直言片语,以及本人的一点见解。

stage的划分是以shuffle操作作为边界的。也就是说某个action导致了shuffle操作,就会划分出两个stage。

stage的划分在RDD的论文中也有详细介绍,简单的说是以shuffle和result这两种类型来划分。在spark中有两类task,一类是shuffle map task,一类是result task,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也依次为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。比如 rdd.parallize(1 to 10).foreach(println) 这个操作没有shuffle,直接就输出了,那么只有它的task是resultTask,stage也只有一个;如果是rdd.map(x => (x, 1)).reduceByKey( + ).foreach(println), 这个job因为有reduce,所以有一个shuffle过程,那么reduceByKey之前的是一个stage,执行shuffleMapTask,输出shuffle所需的数据,reduceByKey到最后是一个stage,直接就输出结果了。如果job中有多次shuffle,那么每个shuffle之前都是一个stage。

在DAGScheduler中,会将每个job划分成多个stage,每个stage会创建一批task并且计算task的最佳位置,一个task对应一个partition。DAGScheduler的stage划分算法如下:它会从触发action操作的那个RDD开始往前推,首先会为最后一个RDD创建一个stage,然后往前倒推的时候,如果发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD创建一个新的stage,那个RDD就是新的stage的最后一个RDD,然后依次类推,继续往前倒推,根据窄依赖或者宽依赖进行stage的划分,直到所有的RDD全部遍历完成为止。

spark任务会根据RDD之间的依赖关系,形成一个DAG有向无环图,DAG会提交给DAGScheduler,DAGScheduler会把DAG划分成相互依赖的多个stage,划分stage的依据就是RDD之间的宽窄依赖。遇到宽依赖就划分stage,每个stage包含一个或多个task任务。然后将这些task以task set的形式交给TaskScheduler运行。stage是由一组并行的task组成

窄依赖

父RDD和子RDD partition之间的关系是一对一的。不会有shuffle的产生。父RDD的一个分区去到子RDD的一个分区中。

宽依赖

父RDD与子RDD partition之间的关系是一对多的。会有shuffle的产生。父RDD的一个分区去到子RDD的不同分区里面。

其实区分宽窄依赖,主要就是看父RDD的一个partition的流向,要是流向一个的话就是窄依赖,流向多个的话就是宽依赖。以WordCount为例,看图理解:

spark stage

运行流程之:task

task是stage下的一个任务执行单元,一般来说,一个rdd有多少个partition,就会有多少个task,因为每一个task只是处理一个partition上的数据。

正文到此结束