Spark中RDD算子的foreachRDD使用注意

闭包

我们的项目需要从 Kafka 消费消息,在对消息进行处理后,再写入到 ActiveMQ,以作为外部系统的数据源。基于这样的逻辑,我们就需要通过 Spark Streaming 读取 Kafka 的消息,获得的结果其实是一个 RDD。DStream 提供了foreachRDD(func)方法,通过该方法可以遍历 RDD 的每条记录,然后再通过 ActiveMQ 的 Producer 将处理后的消息发送到 ActiveMQ。

要将消息发送到 ActiveMQ,就需要建立与消息队列的连接。在传统编程实现中,最直观的做法一定是将获取连接的代码放在foreachRDD(func)方法之外,如此可以避免不必要的资源消耗与时间消耗。例如:

dstream.foreachRDD { rdd =>
  val producer = createProducer()
  rdd.foreach { message =>
    producer.send(message)
  }
}

def createProducer(): MessageProducer = {
  val conn = createActiveMQConnection()
  val session = sessionFrom(conn)
  producerFrom(session)
}


但是,这一做法在 Spark Streaming 中却行不通。原因在于:foreachRDD(func)方法中的 func 是在调用 Spark 流式计算程序的 Driver 进程中执行的,而遍历得到的 RDD 中的操作却是在 worker 中执行

dstream.foreachRDD { rdd =>
  val producer = createProducer()  //在driver进程执行
  rdd.foreach { message =>
    producer.send(message)  //在worker进程执行
  }
}


这就需要将获得的对象(例子中包括了 Connection、Session 和 Producer)进行序列化,使其能够从 driver 发送到 worker。然而,连接等于资源相关的对象往往无法支持序列化,也无法在 worker 正确的初始化。

为了避免这种情况,一种做法是将前面的createProducer()方法搬到内部的rdd.foreach(fn)中来。可是,创建一个 connection 对象往往既费时间又费资源,针对每个 RDD 不停地创建连接,然后又关闭连接,会影响到整个系统的吞吐量和性能。

解决方案是使用foreachPartition(func)方法,通过它创建一个单独的 connection 对象,然后在 RDD 分区里使用这个连接对象将所有数据发送出去:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    sendToActiveMQ { producer =>
      partitionOfRecords.foreach(record => producer.send(record))
    }
  }
}

def sendToActiveMQ(send: MessageProducer => Unit):Unit => {
  val conn = createActiveMQConnection()
  val session = sessionFrom(conn)
  val producer = producerFrom(session)
  send(producer)
  conn.close()
}


为了避免过多的创建和释放 connection 对象,还有一个更好的方案是使用连接池。由于我在前面的代码已经将连接创建与关闭提取出专门的方法,因此只需要修改前面的sendToActiveMQ()即可:

def sendToActiveMQ(send: MessageProducer => Unit):Unit => {
  val conn = ActiveMQConnectionPool.getConnection()
  val session = sessionFrom(conn)
  val producer = producerFrom(session)
  send(producer)
  ActiveMQConnectionPool.returnConnnection(conn)
}


Spark 这种 Driver 与 Worker 互相协作的分布式架构,与单节点的编程模型存在细微差异。开发时,稍不注意就可能出现问题。当然,面对这些问题,最根本的还是要从 Spark 的设计本质来理解,问题也就迎刃而解了。

单例模式

但是,以上更多的是关于匿名函数的闭包方式的讨论,第二种单例模式的处理方式与以上所述存在一些区别。
在一个 Spark 应用的执行过程中,Driver 和 Worker 是两个重要角色。Driver 程序是应用逻辑执行的起点,负责作业的调度,即 Task 任务的分发,而多个 Worker 用来管理

计算节点和创建 Executor 并行处理任务。在执行阶段,Driver 会将 Task 和 Task 所依赖的 file 和 jar 序列化后传递给对应的 Worker 机器,同时 Executor 对相应数据分区的任务进行处理。注意,所有的 Executor 上都会获取一份程序的 jar 包。

单例模式是一种常用的设计模式,但是在集群模式下的 Spark 中使用单例模式会引发一些错误。我们用下面代码作例子,解读在 Spark 中使用单例模式遇到的问题。

object Example{
  var instance:Example = new Example("default_name");
  def getInstance():Example = {
    return instance
  }
  def init(name:String){
    instance = new Example(name)
  }
}
class Example private(name1:String) extends  Serializable{
  var name = name1
}
 
object Main{
  def main(args:Array[String]) = {
    Example.init("new_name")
    val sc =  new SparkContext(newSparkConf().setAppName("test"))
 
    val rdd = sc.parallelize(1 to 10, 3)
    rdd.map(x=>{
      x + "_"+ Example.getInstance().name
    }).collect.foreach(println)
  }
}

本地运行的结果是:

1_new_name
2_new_name
3_new_name
4_new_name
7_new_name
5_new_name
8_new_name
6_new_name
10_new_name
9_new_name
12_new_name
11_new_name

注释掉 setMaster("local[5]") 部分,提交到 Spark 集群运行,得到的结果是:

1_default_name
2_default_name
3_default_name
4_default_name
5_default_name
6_default_name
7_default_name
8_default_name
9_default_name
10_default_name
11_default_name
12_default_name

注意:我们在 rdd.map 中使用了 Exampleobject,但是并没有对 Example 可序列化做任何处理,但是程序并没有抛出不可序列化异常,显而易见,当 Spark 准备闭包的时候,并没有将 Example 整合对象序列化打包传递到 worker 端执行。

这是由什么原因导致的呢?Spark 执行算子之前,会将算子需要东西准备好并打包(这就是闭包的概念),分发到不同的 executor,但这里不包括类。类存在 jar 包中,随着 jar 包分发到不同的 executors 中。当不同的 executors 执行算子需要类时,直接从分发的 jar 包取得。这时候在 driver 上对类的静态变量进行改变,并不能影响 executors 中的类。拿上面的程序做例子,jar 包存的 Example.instance = newExample("default_name"),分发到不同的 executors。这时候不同 executors 中 Example.getInstance().name 等于 "default_name"。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 228,786评论 6 534
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 98,656评论 3 419
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 176,697评论 0 379
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,098评论 1 314
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 71,855评论 6 410
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,254评论 1 324
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,322评论 3 442
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 42,473评论 0 289
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,014评论 1 335
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 40,833评论 3 355
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,016评论 1 371
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 38,568评论 5 362
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,273评论 3 347
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 34,680评论 0 26
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 35,946评论 1 288
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 51,730评论 3 393
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,006评论 2 374