Spark是基于內存的分布式計算引擎,以處理的高效和穩定著稱。然而在實際的應用開發過程中,開發者還是會遇到種種問題,其中一大類就是和性能相關。在本文中,筆者將結合自身實踐,談談如何盡可能地提高應用程序性能。
分布式計算引擎在調優方面有四個主要關注方向,分別是CPU、內存、網絡開銷和I/O,其具體調優目標如下:
- 提高CPU利用率。
- 避免OOM。
- 降低網絡開銷。
- 減少I/O操作。
第1章 數據傾斜
數據傾斜意味著某一個或某幾個Partition中的數據量特別的大,這意味著完成針對這幾個Partition的計算需要耗費相當長的時間。
如 果大量數據集中到某一個Partition,那么這個Partition在計算的時候就會成為瓶頸。圖1是Spark應用程序執行并發的示意圖,在 Spark中,同一個應用程序的不同Stage是串行執行的,而同一Stage中的不同Task可以并發執行,Task數目由Partition數來決 定,如果某一個Partition的數據量特別大,則相應的task完成時間會特別長,由此導致接下來的Stage無法開始,整個Job完成的時間就會非 常長。
要避免數據傾斜的出現,一種方法就是選擇合適的key,或者是自己定義相關的partitioner。在Spark中Block使用 了ByteBuffer來存儲數據,而ByteBuffer能夠存儲的最大數據量不超過2GB。如果某一個key有大量的數據,那么在調用cache或 persist函數時就會碰到spark-1476這個異常。
下面列出的這些API會導致Shuffle操作,是數據傾斜可能發生的關鍵點所在
1. groupByKey
2. reduceByKey
3. aggregateByKey
4. sortByKey
5. join
6. cogroup
7. cartesian
8. coalesce
9. repartition
10. repartitionAndSortWithinPartitions
def rdd: RDD[T] } // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] { // Here we use a single Long to try to ensure the sort is balanced, // but for really large dataset, we may want to consider // using a tuple of many Longs or even a GUID def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] = rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey() .grouped(numPartitions).map(t => (t._1._1, t._2)) } case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] { def grouped(size: Int): RDD[T] = { // TODO Version where withIndex is cached val withIndex = rdd.mapPartitions(_.zipWithIndex) val startValues = withIndex.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.toIterable.last))).toArray().toList .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L) withIndex.mapPartitionsWithIndex((i, iter) => iter.map { case (value, index) => (startValues(i) + index.toLong, value) }) .partitionBy(new Partitioner { def numPartitions: Int = size def getPartition(key: Any): Int = (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt }) .map(_._2) } }
定義隱式的轉換
implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] = new DemoRDD[T](rdd) implicit def toDemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd) implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd }
在spark-shell中就可以使用了
import RDDConversions._ yourRdd.grouped(5)
第2章 減少網絡通信開銷
Spark 的Shuffle過程非常消耗資源,Shuffle過程意味著在相應的計算節點,要先將計算結果存儲到磁盤,后續的Stage需要將上一個Stage的結 果再次讀入。數據的寫入和讀取意味著Disk I/O操作,與內存操作相比,Disk I/O操作是非常低效的。
使用iostat來查看disk i/o的使用情況,disk i/o操作頻繁一般會伴隨著cpu load很高。
如果數據和計算節點都在同一臺機器上,那么可以避免網絡開銷,否則還要加上相應的網絡開銷。 使用iftop來查看網絡帶寬使用情況,看哪幾個節點之間有大量的網絡傳輸。
圖2是Spark節點間數據傳輸的示意圖,Spark Task的計算函數是通過Akka通道由Driver發送到Executor上,而Shuffle的數據則是通過Netty網絡接口來實現。由于Akka 通道中參數spark.akka.framesize決定了能夠傳輸消息的最大值,所以應該避免在Spark Task中引入超大的局部變量。
第1節 選擇合適的并發數
為了提高Spark應用程序的效率,盡可能的提升CPU的利用率。并發數應該是可用CPU物理核數的兩倍。在這里,并發數過低,CPU得不到充分的利用,并發數過大,由于spark是每一個task都要分發到計算結點,所以任務啟動的開銷會上升。
并發數的修改,通過配置參數來改變spark.default.parallelism,如果是sql的話,可能通過修改spark.sql.shuffle.partitions來修改。
第1項 Repartition vs. Coalesce
repartition和coalesce都能實現數據分區的動態調整,但需要注意的是repartition會導致shuffle操作,而coalesce不會。
第2節 reduceByKey vs. groupBy
groupBy操作應該盡可能的避免,第一是有可能造成大量的網絡開銷,第二是可能導致OOM。以WordCount為例來演示reduceByKey和groupBy的差異
reduceByKey sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reduceByKey(_ + _)
Shuffle過程如圖2所示
groupByKey sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupByKey.map(r=>(r._1,r._2.sum))
建議: 盡可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
假設有一RDD如下所示,求每個key的均值
val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )
方法一:reduceByKey
data.map(r=>(r._1, (r.2,1))).reduceByKey((a,b)=>(a._1 + b._1, a._2 + b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)
方法二:combineByKey
data.combineByKey(value=>(value,1), (x:(Double, Int), value:Double)=> (x._1+value, x._2 + 1), (x:(Double,Int), y:(Double, Int))=>(x._1 + y._1, x._2 + y._2))
第3節 BroadcastHashJoin vs. ShuffleHashJoin
在Join過程中,經常會遇到大表和小表的join. 為了提高效率可以使用BroadcastHashJoin, 預先將小表的內容廣播到各個Executor, 這樣將避免針對小表的Shuffle過程,從而極大的提高運行效率。
其實BroadCastHashJoin核心就是利用了BroadCast函數,如果理解清楚broadcast的優點,就能比較好的明白BroadcastHashJoin的優勢所在。
以下是一個簡單使用broadcast的示例程序。
val lst = 1 to 100 toList val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2) val broadcastLst = sc.broadcast(lst) exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)
第4節 map vs. mapPartitions
有時需要將計算結果存儲到外部數據庫,勢必會建立到外部數據庫的連接。應該盡可能的讓更多的元素共享同一個數據連接而不是每一個元素的處理時都去建立數據庫連接。
在這種情況下,mapPartitions和foreachPartitons將比map操作高效的多。
第5節 數據就地讀取
移動計算的開銷遠遠低于移動數據的開銷。
Spark中每個Task都需要相應的輸入數據,因此輸入數據的位置對于Task的性能變得很重要。按照數據獲取的速度來區分,由快到慢分別是:
1.PROCESS_LOCAL
2.NODE_LOCAL
3.RACK_LOCAL
Spark在Task執行的時候會盡優先考慮最快的數據獲取方式,如果想盡可能的在更多的機器上啟動Task,那么可以通過調低spark.locality.wait的值來實現, 默認值是3s。
除 了HDFS,Spark能夠支持的數據源越來越多,如Cassandra, HBase,MongoDB等知名的NoSQL數據庫,隨著Elasticsearch的日漸興起,spark和elasticsearch組合起來提供 高速的查詢解決方案也成為一種有益的嘗試。
上述提到的外部數據源面臨的一個相同問題就是如何讓spark快速讀取其中的數據, 盡可能的將計算結點和數據結點部署在一起是達到該目標的基本方法,比如在部署Hadoop集群的時候,可以將HDFS的DataNode和Spark Worker共享一臺機器。
以cassandra為例,如果Spark的部署和Cassandra的機器有部分重疊,那么在讀取Cassandra中數據的時候,通過調低spark.locality.wait就可以在沒有部署Cassandra的機器上啟動Spark Task。
對于Cassandra, 可以在部署Cassandra的機器上部署Spark Worker,需要注意的是Cassandra的compaction操作會極大的消耗CPU,因此在為Spark Worker配置CPU核數時,需要將這些因素綜合在一起進行考慮。
這一部分的代碼邏輯可以參考源碼TaskSetManager::addPendingTask
private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { list += index } } for (loc <- tasks(index).preferredLocations) { loc match { case e: ExecutorCacheTaskLocation => addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer)) case e: HDFSCacheTaskLocation => { val exe = sched.getExecutorsAliveOnHost(loc.host) exe match { case Some(set) => { for (e <- set) { addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer)) } logInfo(s"Pending task $index has a cached location at ${e.host} " + ", where there are executors " + set.mkString(",")) } case None => logDebug(s"Pending task $index has a cached location at ${e.host} " + ", but there are no executors alive there.") } } case _ => Unit } addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) } } if (tasks(index).preferredLocations == Nil) { addTo(pendingTasksWithNoPrefs) } if (!readding) { allPendingTasks += index // No point scanning this whole list to find the old task there } }
如果準備讓spark支持新的存儲源,進而開發相應的RDD,與位置相關的部分就是自定義getPreferredLocations函數,以elasticsearch-hadoop中的EsRDD為例,其代碼實現如下。
override def getPreferredLocations(split: Partition): Seq[String] = { val esSplit = split.asInstanceOf[EsPartition] val ip = esSplit.esPartition.nodeIp if (ip != null) Seq(ip) else Nil }
第6節 序列化
使用好的序列化算法能夠提高運行速度,同時能夠減少內存的使用。
Spark在Shuffle的時候要將數據先存儲到磁盤中,存儲的內容是經過序列化的。序列化的過程牽涉到兩大基本考慮的因素,一是序列化的速度,二是序列化后內容所占用的大小。
kryoSerializer與默認的javaSerializer相比,在序列化速度和序列化結果的大小方面都具有極大的優勢。所以建議在應用程序配置中使用KryoSerializer.
spark.serializer org.apache.spark.serializer.KryoSerializer
默認的cache沒有對緩存的對象進行序列化,使用的StorageLevel是MEMORY_ONLY,這意味著要占用比較大的內存。可以通過指定persist中的參數來對緩存內容進行序列化。
exampleRDD.persist(MEMORY_ONLY_SER)
需要特別指出的是persist函數是等到job執行的時候才會將數據緩存起來,屬于延遲執行; 而unpersist函數則是立即執行,緩存會被立即清除。
作者簡介:許鵬, 《Apache Spark源碼剖析》作者,關注于大數據實時搜索和實時流數據處理,對elasticsearch, storm及drools多有研究,現就職于攜程。