Spark是基于內(nèi)存的分布式計(jì)算引擎,以處理的高效和穩(wěn)定著稱。然而在實(shí)際的應(yīng)用開發(fā)過程中,開發(fā)者還是會遇到種種問題,其中一大類就是和性能相關(guān)。在本文中,筆者將結(jié)合自身實(shí)踐,談?wù)勅绾伪M可能地提高應(yīng)用程序性能。
分布式計(jì)算引擎在調(diào)優(yōu)方面有四個主要關(guān)注方向,分別是CPU、內(nèi)存、網(wǎng)絡(luò)開銷和I/O,其具體調(diào)優(yōu)目標(biāo)如下:
- 提高CPU利用率。
- 避免OOM。
- 降低網(wǎng)絡(luò)開銷。
- 減少I/O操作。
第1章 數(shù)據(jù)傾斜
數(shù)據(jù)傾斜意味著某一個或某幾個Partition中的數(shù)據(jù)量特別的大,這意味著完成針對這幾個Partition的計(jì)算需要耗費(fèi)相當(dāng)長的時間。
如 果大量數(shù)據(jù)集中到某一個Partition,那么這個Partition在計(jì)算的時候就會成為瓶頸。圖1是Spark應(yīng)用程序執(zhí)行并發(fā)的示意圖,在 Spark中,同一個應(yīng)用程序的不同Stage是串行執(zhí)行的,而同一Stage中的不同Task可以并發(fā)執(zhí)行,Task數(shù)目由Partition數(shù)來決 定,如果某一個Partition的數(shù)據(jù)量特別大,則相應(yīng)的task完成時間會特別長,由此導(dǎo)致接下來的Stage無法開始,整個Job完成的時間就會非 常長。
要避免數(shù)據(jù)傾斜的出現(xiàn),一種方法就是選擇合適的key,或者是自己定義相關(guān)的partitioner。在Spark中Block使用 了ByteBuffer來存儲數(shù)據(jù),而ByteBuffer能夠存儲的最大數(shù)據(jù)量不超過2GB。如果某一個key有大量的數(shù)據(jù),那么在調(diào)用cache或 persist函數(shù)時就會碰到spark-1476這個異常。
下面列出的這些API會導(dǎo)致Shuffle操作,是數(shù)據(jù)傾斜可能發(fā)生的關(guān)鍵點(diǎn)所在
1. groupByKey
2. reduceByKey
3. aggregateByKey
4. sortByKey
5. join
6. cogroup
7. cartesian
8. coalesce
9. repartition
10. repartitionAndSortWithinPartitions
def rdd: RDD[T] } // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2)) } case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) } }
定義隱式的轉(zhuǎn)換
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] = new DemoRDD[T](rdd) implicit def toDemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd) implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd }
在spark-shell中就可以使用了
import RDDConversions._ yourRdd.grouped(5)
第2章 減少網(wǎng)絡(luò)通信開銷
Spark 的Shuffle過程非常消耗資源,Shuffle過程意味著在相應(yīng)的計(jì)算節(jié)點(diǎn),要先將計(jì)算結(jié)果存儲到磁盤,后續(xù)的Stage需要將上一個Stage的結(jié) 果再次讀入。數(shù)據(jù)的寫入和讀取意味著Disk I/O操作,與內(nèi)存操作相比,Disk I/O操作是非常低效的。
使用iostat來查看disk i/o的使用情況,disk i/o操作頻繁一般會伴隨著cpu load很高。
如果數(shù)據(jù)和計(jì)算節(jié)點(diǎn)都在同一臺機(jī)器上,那么可以避免網(wǎng)絡(luò)開銷,否則還要加上相應(yīng)的網(wǎng)絡(luò)開銷。 使用iftop來查看網(wǎng)絡(luò)帶寬使用情況,看哪幾個節(jié)點(diǎn)之間有大量的網(wǎng)絡(luò)傳輸。
圖2是Spark節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)氖疽鈭D,Spark Task的計(jì)算函數(shù)是通過Akka通道由Driver發(fā)送到Executor上,而Shuffle的數(shù)據(jù)則是通過Netty網(wǎng)絡(luò)接口來實(shí)現(xiàn)。由于Akka 通道中參數(shù)spark.akka.framesize決定了能夠傳輸消息的最大值,所以應(yīng)該避免在Spark Task中引入超大的局部變量。
第1節(jié) 選擇合適的并發(fā)數(shù)
為了提高Spark應(yīng)用程序的效率,盡可能的提升CPU的利用率。并發(fā)數(shù)應(yīng)該是可用CPU物理核數(shù)的兩倍。在這里,并發(fā)數(shù)過低,CPU得不到充分的利用,并發(fā)數(shù)過大,由于spark是每一個task都要分發(fā)到計(jì)算結(jié)點(diǎn),所以任務(wù)啟動的開銷會上升。
并發(fā)數(shù)的修改,通過配置參數(shù)來改變spark.default.parallelism,如果是sql的話,可能通過修改spark.sql.shuffle.partitions來修改。
第1項(xiàng) Repartition vs. Coalesce
repartition和coalesce都能實(shí)現(xiàn)數(shù)據(jù)分區(qū)的動態(tài)調(diào)整,但需要注意的是repartition會導(dǎo)致shuffle操作,而coalesce不會。
第2節(jié) reduceByKey vs. groupBy
groupBy操作應(yīng)該盡可能的避免,第一是有可能造成大量的網(wǎng)絡(luò)開銷,第二是可能導(dǎo)致OOM。以WordCount為例來演示reduceByKey和groupBy的差異
reduceByKey sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reduceByKey(_ + _)
Shuffle過程如圖2所示
groupByKey sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupByKey.map(r=>(r._1,r._2.sum))
建議: 盡可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
假設(shè)有一RDD如下所示,求每個key的均值
val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
方法一:reduceByKey
data.map(r=>(r._1, (r.2,1))).reduceByKey((a,b)=>(a._1 + b._1, a._2 + b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)
方法二:combineByKey
data.combineByKey(value=>(value,1), (x:(Double, Int), value:Double)=> (x._1+value, x._2 + 1), (x:(Double,Int), y:(Double, Int))=>(x._1 + y._1, x._2 + y._2))
第3節(jié) BroadcastHashJoin vs. ShuffleHashJoin
在Join過程中,經(jīng)常會遇到大表和小表的join. 為了提高效率可以使用BroadcastHashJoin, 預(yù)先將小表的內(nèi)容廣播到各個Executor, 這樣將避免針對小表的Shuffle過程,從而極大的提高運(yùn)行效率。
其實(shí)BroadCastHashJoin核心就是利用了BroadCast函數(shù),如果理解清楚broadcast的優(yōu)點(diǎn),就能比較好的明白BroadcastHashJoin的優(yōu)勢所在。
以下是一個簡單使用broadcast的示例程序。
val lst = 1 to 100 toList val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2) val broadcastLst = sc.broadcast(lst) exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)
第4節(jié) map vs. mapPartitions
有時需要將計(jì)算結(jié)果存儲到外部數(shù)據(jù)庫,勢必會建立到外部數(shù)據(jù)庫的連接。應(yīng)該盡可能的讓更多的元素共享同一個數(shù)據(jù)連接而不是每一個元素的處理時都去建立數(shù)據(jù)庫連接。
在這種情況下,mapPartitions和foreachPartitons將比map操作高效的多。
第5節(jié) 數(shù)據(jù)就地讀取
移動計(jì)算的開銷遠(yuǎn)遠(yuǎn)低于移動數(shù)據(jù)的開銷。
Spark中每個Task都需要相應(yīng)的輸入數(shù)據(jù),因此輸入數(shù)據(jù)的位置對于Task的性能變得很重要。按照數(shù)據(jù)獲取的速度來區(qū)分,由快到慢分別是:
1.PROCESS_LOCAL
2.NODE_LOCAL
3.RACK_LOCAL
Spark在Task執(zhí)行的時候會盡優(yōu)先考慮最快的數(shù)據(jù)獲取方式,如果想盡可能的在更多的機(jī)器上啟動Task,那么可以通過調(diào)低spark.locality.wait的值來實(shí)現(xiàn), 默認(rèn)值是3s。
除 了HDFS,Spark能夠支持的數(shù)據(jù)源越來越多,如Cassandra, HBase,MongoDB等知名的NoSQL數(shù)據(jù)庫,隨著Elasticsearch的日漸興起,spark和elasticsearch組合起來提供 高速的查詢解決方案也成為一種有益的嘗試。
上述提到的外部數(shù)據(jù)源面臨的一個相同問題就是如何讓spark快速讀取其中的數(shù)據(jù), 盡可能的將計(jì)算結(jié)點(diǎn)和數(shù)據(jù)結(jié)點(diǎn)部署在一起是達(dá)到該目標(biāo)的基本方法,比如在部署Hadoop集群的時候,可以將HDFS的DataNode和Spark Worker共享一臺機(jī)器。
以cassandra為例,如果Spark的部署和Cassandra的機(jī)器有部分重疊,那么在讀取Cassandra中數(shù)據(jù)的時候,通過調(diào)低spark.locality.wait就可以在沒有部署Cassandra的機(jī)器上啟動Spark Task。
對于Cassandra, 可以在部署Cassandra的機(jī)器上部署Spark Worker,需要注意的是Cassandra的compaction操作會極大的消耗CPU,因此在為Spark Worker配置CPU核數(shù)時,需要將這些因素綜合在一起進(jìn)行考慮。
這一部分的代碼邏輯可以參考源碼TaskSetManager::addPendingTask
private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { list += index } } for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer)) case e: HDFSCacheTaskLocation => { val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => { for (e <- set) { addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer)) } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) } case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } } case _ => Unit } addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) } } if (tasks(index).preferredLocations == Nil) { addTo(pendingTasksWithNoPrefs) } if (!readding) { allPendingTasks += index // No point scanning this whole list to find the old task there } }
如果準(zhǔn)備讓spark支持新的存儲源,進(jìn)而開發(fā)相應(yīng)的RDD,與位置相關(guān)的部分就是自定義getPreferredLocations函數(shù),以elasticsearch-hadoop中的EsRDD為例,其代碼實(shí)現(xiàn)如下。
override def getPreferredLocations(split: Partition): Seq[String] = { val esSplit = split.asInstanceOf[EsPartition] val ip = esSplit.esPartition.nodeIp if (ip != null) Seq(ip) else Nil }
第6節(jié) 序列化
使用好的序列化算法能夠提高運(yùn)行速度,同時能夠減少內(nèi)存的使用。
Spark在Shuffle的時候要將數(shù)據(jù)先存儲到磁盤中,存儲的內(nèi)容是經(jīng)過序列化的。序列化的過程牽涉到兩大基本考慮的因素,一是序列化的速度,二是序列化后內(nèi)容所占用的大小。
kryoSerializer與默認(rèn)的javaSerializer相比,在序列化速度和序列化結(jié)果的大小方面都具有極大的優(yōu)勢。所以建議在應(yīng)用程序配置中使用KryoSerializer.
spark.serializer org.apache.spark.serializer.KryoSerializer
默認(rèn)的cache沒有對緩存的對象進(jìn)行序列化,使用的StorageLevel是MEMORY_ONLY,這意味著要占用比較大的內(nèi)存。可以通過指定persist中的參數(shù)來對緩存內(nèi)容進(jìn)行序列化。
exampleRDD.persist(MEMORY_ONLY_SER)
需要特別指出的是persist函數(shù)是等到j(luò)ob執(zhí)行的時候才會將數(shù)據(jù)緩存起來,屬于延遲執(zhí)行; 而unpersist函數(shù)則是立即執(zhí)行,緩存會被立即清除。
作者簡介:許鵬, 《Apache Spark源碼剖析》作者,關(guān)注于大數(shù)據(jù)實(shí)時搜索和實(shí)時流數(shù)據(jù)處理,對elasticsearch, storm及drools多有研究,現(xiàn)就職于攜程。