posts - 495,comments - 227,trackbacks - 0

          Spark是基于內(nèi)存的分布式計(jì)算引擎,以處理的高效和穩(wěn)定著稱。然而在實(shí)際的應(yīng)用開發(fā)過程中,開發(fā)者還是會遇到種種問題,其中一大類就是和性能相關(guān)。在本文中,筆者將結(jié)合自身實(shí)踐,談?wù)勅绾伪M可能地提高應(yīng)用程序性能。

          分布式計(jì)算引擎在調(diào)優(yōu)方面有四個主要關(guān)注方向,分別是CPU、內(nèi)存、網(wǎng)絡(luò)開銷和I/O,其具體調(diào)優(yōu)目標(biāo)如下:

          1. 提高CPU利用率。
          2. 避免OOM。
          3. 降低網(wǎng)絡(luò)開銷。
          4. 減少I/O操作。

          第1章 數(shù)據(jù)傾斜

          數(shù)據(jù)傾斜意味著某一個或某幾個Partition中的數(shù)據(jù)量特別的大,這意味著完成針對這幾個Partition的計(jì)算需要耗費(fèi)相當(dāng)長的時間。

          如 果大量數(shù)據(jù)集中到某一個Partition,那么這個Partition在計(jì)算的時候就會成為瓶頸。圖1是Spark應(yīng)用程序執(zhí)行并發(fā)的示意圖,在 Spark中,同一個應(yīng)用程序的不同Stage是串行執(zhí)行的,而同一Stage中的不同Task可以并發(fā)執(zhí)行,Task數(shù)目由Partition數(shù)來決 定,如果某一個Partition的數(shù)據(jù)量特別大,則相應(yīng)的task完成時間會特別長,由此導(dǎo)致接下來的Stage無法開始,整個Job完成的時間就會非 常長。

          要避免數(shù)據(jù)傾斜的出現(xiàn),一種方法就是選擇合適的key,或者是自己定義相關(guān)的partitioner。在Spark中Block使用 了ByteBuffer來存儲數(shù)據(jù),而ByteBuffer能夠存儲的最大數(shù)據(jù)量不超過2GB。如果某一個key有大量的數(shù)據(jù),那么在調(diào)用cache或 persist函數(shù)時就會碰到spark-1476這個異常。

          下面列出的這些API會導(dǎo)致Shuffle操作,是數(shù)據(jù)傾斜可能發(fā)生的關(guān)鍵點(diǎn)所在
          1. groupByKey
          2. reduceByKey
          3. aggregateByKey
          4. sortByKey
          5. join
          6. cogroup
          7. cartesian
          8. coalesce
          9. repartition
          10. repartitionAndSortWithinPartitions

          圖片描述

          圖1: Spark任務(wù)并發(fā)模型

            def rdd: RDD[T] }  // TODO View bounds are deprecated, should use context bounds // Might need to change ClassManifest for ClassTag in spark 1.0.0 case class DemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](   rdd: RDD[(K, V)]) extends RDDWrapper[(K, V)] {   // Here we use a single Long to try to ensure the sort is balanced,    // but for really large dataset, we may want to consider   // using a tuple of many Longs or even a GUID   def sortByKeyGrouped(numPartitions: Int): RDD[(K, V)] =     rdd.map(kv => ((kv._1, Random.nextLong()), kv._2)).sortByKey()     .grouped(numPartitions).map(t => (t._1._1, t._2)) }  case class DemoRDD[T: ClassManifest](rdd: RDD[T]) extends RDDWrapper[T] {   def grouped(size: Int): RDD[T] = {     // TODO Version where withIndex is cached     val withIndex = rdd.mapPartitions(_.zipWithIndex)      val startValues =       withIndex.mapPartitionsWithIndex((i, iter) =>          Iterator((i, iter.toIterable.last))).toArray().toList       .sortBy(_._1).map(_._2._2.toLong).scan(-1L)(_ + _).map(_ + 1L)      withIndex.mapPartitionsWithIndex((i, iter) => iter.map {       case (value, index) => (startValues(i) + index.toLong, value)     })     .partitionBy(new Partitioner {       def numPartitions: Int = size       def getPartition(key: Any): Int =          (key.asInstanceOf[Long] * numPartitions.toLong / startValues.last).toInt     })     .map(_._2)   } }

          定義隱式的轉(zhuǎn)換

            implicit def toDemoRDD[T: ClassManifest](rdd: RDD[T]): DemoRDD[T] =      new DemoRDD[T](rdd)   implicit def toDemoPairRDD[K <% Ordered[K] : ClassManifest, V: ClassManifest](     rdd: RDD[(K, V)]): DemoPairRDD[K, V] = DemoPairRDD(rdd)   implicit def toRDD[T](rdd: RDDWrapper[T]): RDD[T] = rdd.rdd }

          在spark-shell中就可以使用了

          import RDDConversions._  yourRdd.grouped(5)

          第2章 減少網(wǎng)絡(luò)通信開銷

          Spark 的Shuffle過程非常消耗資源,Shuffle過程意味著在相應(yīng)的計(jì)算節(jié)點(diǎn),要先將計(jì)算結(jié)果存儲到磁盤,后續(xù)的Stage需要將上一個Stage的結(jié) 果再次讀入。數(shù)據(jù)的寫入和讀取意味著Disk I/O操作,與內(nèi)存操作相比,Disk I/O操作是非常低效的。

          使用iostat來查看disk i/o的使用情況,disk i/o操作頻繁一般會伴隨著cpu load很高。

          如果數(shù)據(jù)和計(jì)算節(jié)點(diǎn)都在同一臺機(jī)器上,那么可以避免網(wǎng)絡(luò)開銷,否則還要加上相應(yīng)的網(wǎng)絡(luò)開銷。 使用iftop來查看網(wǎng)絡(luò)帶寬使用情況,看哪幾個節(jié)點(diǎn)之間有大量的網(wǎng)絡(luò)傳輸。
          圖2是Spark節(jié)點(diǎn)間數(shù)據(jù)傳輸?shù)氖疽鈭D,Spark Task的計(jì)算函數(shù)是通過Akka通道由Driver發(fā)送到Executor上,而Shuffle的數(shù)據(jù)則是通過Netty網(wǎng)絡(luò)接口來實(shí)現(xiàn)。由于Akka 通道中參數(shù)spark.akka.framesize決定了能夠傳輸消息的最大值,所以應(yīng)該避免在Spark Task中引入超大的局部變量。

          圖片描述

          圖2: Spark節(jié)點(diǎn)間的數(shù)據(jù)傳輸

          第1節(jié) 選擇合適的并發(fā)數(shù)

          為了提高Spark應(yīng)用程序的效率,盡可能的提升CPU的利用率。并發(fā)數(shù)應(yīng)該是可用CPU物理核數(shù)的兩倍。在這里,并發(fā)數(shù)過低,CPU得不到充分的利用,并發(fā)數(shù)過大,由于spark是每一個task都要分發(fā)到計(jì)算結(jié)點(diǎn),所以任務(wù)啟動的開銷會上升。

          并發(fā)數(shù)的修改,通過配置參數(shù)來改變spark.default.parallelism,如果是sql的話,可能通過修改spark.sql.shuffle.partitions來修改。

          第1項(xiàng) Repartition vs. Coalesce

          repartition和coalesce都能實(shí)現(xiàn)數(shù)據(jù)分區(qū)的動態(tài)調(diào)整,但需要注意的是repartition會導(dǎo)致shuffle操作,而coalesce不會。

          第2節(jié) reduceByKey vs. groupBy

          groupBy操作應(yīng)該盡可能的避免,第一是有可能造成大量的網(wǎng)絡(luò)開銷,第二是可能導(dǎo)致OOM。以WordCount為例來演示reduceByKey和groupBy的差異

          reduceByKey     sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).reduceByKey(_ + _)

          圖片描述

          圖3:reduceByKey的Shuffle過程

          Shuffle過程如圖2所示

          groupByKey     sc.textFile(“README.md”).map(l=>l.split(“,”)).map(w=>(w,1)).groupByKey.map(r=>(r._1,r._2.sum))

          圖片描述

          圖4:groupByKey的Shuffle過程

          建議: 盡可能使用reduceByKey, aggregateByKey, foldByKey和combineByKey
          假設(shè)有一RDD如下所示,求每個key的均值

          val data = sc.parallelize( List((0, 2.), (0, 4.), (1, 0.), (1, 10.), (1, 20.)) )

          方法一:reduceByKey

          data.map(r=>(r._1, (r.2,1))).reduceByKey((a,b)=>(a._1 + b._1, a._2 + b._2)).map(r=>(r._1,(r._2._1/r._2._2)).foreach(println)

          方法二:combineByKey

          data.combineByKey(value=>(value,1),     (x:(Double, Int), value:Double)=> (x._1+value, x._2 + 1),     (x:(Double,Int), y:(Double, Int))=>(x._1 + y._1, x._2 + y._2))

          第3節(jié) BroadcastHashJoin vs. ShuffleHashJoin

          在Join過程中,經(jīng)常會遇到大表和小表的join. 為了提高效率可以使用BroadcastHashJoin, 預(yù)先將小表的內(nèi)容廣播到各個Executor, 這樣將避免針對小表的Shuffle過程,從而極大的提高運(yùn)行效率。

          其實(shí)BroadCastHashJoin核心就是利用了BroadCast函數(shù),如果理解清楚broadcast的優(yōu)點(diǎn),就能比較好的明白BroadcastHashJoin的優(yōu)勢所在。

          以下是一個簡單使用broadcast的示例程序。

          val lst = 1 to 100 toList val exampleRDD = sc.makeRDD(1 to 20 toSeq, 2) val broadcastLst = sc.broadcast(lst) exampleRDD.filter(i=>broadcastLst.valuecontains(i)).collect.foreach(println)

          第4節(jié) map vs. mapPartitions

          有時需要將計(jì)算結(jié)果存儲到外部數(shù)據(jù)庫,勢必會建立到外部數(shù)據(jù)庫的連接。應(yīng)該盡可能的讓更多的元素共享同一個數(shù)據(jù)連接而不是每一個元素的處理時都去建立數(shù)據(jù)庫連接。
          在這種情況下,mapPartitions和foreachPartitons將比map操作高效的多。

          第5節(jié) 數(shù)據(jù)就地讀取

          移動計(jì)算的開銷遠(yuǎn)遠(yuǎn)低于移動數(shù)據(jù)的開銷。

          Spark中每個Task都需要相應(yīng)的輸入數(shù)據(jù),因此輸入數(shù)據(jù)的位置對于Task的性能變得很重要。按照數(shù)據(jù)獲取的速度來區(qū)分,由快到慢分別是:

          1.PROCESS_LOCAL
          2.NODE_LOCAL
          3.RACK_LOCAL

          Spark在Task執(zhí)行的時候會盡優(yōu)先考慮最快的數(shù)據(jù)獲取方式,如果想盡可能的在更多的機(jī)器上啟動Task,那么可以通過調(diào)低spark.locality.wait的值來實(shí)現(xiàn), 默認(rèn)值是3s。

          除 了HDFS,Spark能夠支持的數(shù)據(jù)源越來越多,如Cassandra, HBase,MongoDB等知名的NoSQL數(shù)據(jù)庫,隨著Elasticsearch的日漸興起,spark和elasticsearch組合起來提供 高速的查詢解決方案也成為一種有益的嘗試。

          上述提到的外部數(shù)據(jù)源面臨的一個相同問題就是如何讓spark快速讀取其中的數(shù)據(jù), 盡可能的將計(jì)算結(jié)點(diǎn)和數(shù)據(jù)結(jié)點(diǎn)部署在一起是達(dá)到該目標(biāo)的基本方法,比如在部署Hadoop集群的時候,可以將HDFS的DataNode和Spark Worker共享一臺機(jī)器。

          以cassandra為例,如果Spark的部署和Cassandra的機(jī)器有部分重疊,那么在讀取Cassandra中數(shù)據(jù)的時候,通過調(diào)低spark.locality.wait就可以在沒有部署Cassandra的機(jī)器上啟動Spark Task。

          對于Cassandra, 可以在部署Cassandra的機(jī)器上部署Spark Worker,需要注意的是Cassandra的compaction操作會極大的消耗CPU,因此在為Spark Worker配置CPU核數(shù)時,需要將這些因素綜合在一起進(jìn)行考慮。

          這一部分的代碼邏輯可以參考源碼TaskSetManager::addPendingTask

          private def addPendingTask(index: Int, readding: Boolean = false) {   // Utility method that adds `index` to a list only if readding=false or it's not already there   def addTo(list: ArrayBuffer[Int]) {     if (!readding || !list.contains(index)) {       list += index     }   }    for (loc <- tasks(index).preferredLocations) {     loc match {       case e: ExecutorCacheTaskLocation =>         addTo(pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer))       case e: HDFSCacheTaskLocation => {         val exe = sched.getExecutorsAliveOnHost(loc.host)         exe match {           case Some(set) => {             for (e <- set) {               addTo(pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer))             }             logInfo(s"Pending task $index has a cached location at ${e.host} " +               ", where there are executors " + set.mkString(","))           }           case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +               ", but there are no executors alive there.")         }       }       case _ => Unit     }     addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))     for (rack <- sched.getRackForHost(loc.host)) {       addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))     }   }    if (tasks(index).preferredLocations == Nil) {     addTo(pendingTasksWithNoPrefs)   }    if (!readding) {     allPendingTasks += index  // No point scanning this whole list to find the old task there   } }

          如果準(zhǔn)備讓spark支持新的存儲源,進(jìn)而開發(fā)相應(yīng)的RDD,與位置相關(guān)的部分就是自定義getPreferredLocations函數(shù),以elasticsearch-hadoop中的EsRDD為例,其代碼實(shí)現(xiàn)如下。

          override def getPreferredLocations(split: Partition): Seq[String] = {   val esSplit = split.asInstanceOf[EsPartition]   val ip = esSplit.esPartition.nodeIp   if (ip != null) Seq(ip) else Nil }

          第6節(jié) 序列化

          使用好的序列化算法能夠提高運(yùn)行速度,同時能夠減少內(nèi)存的使用。

          Spark在Shuffle的時候要將數(shù)據(jù)先存儲到磁盤中,存儲的內(nèi)容是經(jīng)過序列化的。序列化的過程牽涉到兩大基本考慮的因素,一是序列化的速度,二是序列化后內(nèi)容所占用的大小。

          kryoSerializer與默認(rèn)的javaSerializer相比,在序列化速度和序列化結(jié)果的大小方面都具有極大的優(yōu)勢。所以建議在應(yīng)用程序配置中使用KryoSerializer.

          spark.serializer  org.apache.spark.serializer.KryoSerializer

          默認(rèn)的cache沒有對緩存的對象進(jìn)行序列化,使用的StorageLevel是MEMORY_ONLY,這意味著要占用比較大的內(nèi)存。可以通過指定persist中的參數(shù)來對緩存內(nèi)容進(jìn)行序列化。

          exampleRDD.persist(MEMORY_ONLY_SER)

          需要特別指出的是persist函數(shù)是等到j(luò)ob執(zhí)行的時候才會將數(shù)據(jù)緩存起來,屬于延遲執(zhí)行; 而unpersist函數(shù)則是立即執(zhí)行,緩存會被立即清除。

          作者簡介:許鵬, 《Apache Spark源碼剖析》作者,關(guān)注于大數(shù)據(jù)實(shí)時搜索和實(shí)時流數(shù)據(jù)處理,對elasticsearch, storm及drools多有研究,現(xiàn)就職于攜程。

          posted on 2016-03-02 14:12 SIMONE 閱讀(911) 評論(0)  編輯  收藏

          只有注冊用戶登錄后才能發(fā)表評論。


          網(wǎng)站導(dǎo)航:
           
          主站蜘蛛池模板: 呼图壁县| 依兰县| 汶上县| 奉化市| 陇西县| 景洪市| 德庆县| 南城县| 阿城市| 临澧县| 井陉县| 雷波县| 昌图县| 樟树市| 石泉县| 繁峙县| 泽州县| 伽师县| 淮安市| 宜城市| 定日县| 井陉县| 紫阳县| 鲁甸县| 台前县| 阿城市| 香港 | 钟山县| 株洲市| 周口市| 邓州市| 海淀区| 大方县| 芦溪县| 玉环县| 陇川县| 凌源市| 枣庄市| 双柏县| 永登县| 平泉县|