摘要:過程中,各個節點上的相同都會先寫入本地磁盤文件中,然后其他節點需要通過網絡傳輸拉取各個節點上的磁盤文件中的相同。因此在過程中,可能會發生大量的磁盤文件讀寫的操作,以及數據的網絡傳輸操作。
需要對名為“hello.txt”的HDFS文件進行一次map操作,再進行一次reduce操作。也就是說,需要對一份數據執行兩次算子操作。
錯誤的做法:
對于同一份數據執行多次算子操作時,創建多個RDD。//這里執行了兩次textFile方法,針對同一個HDFS文件,創建了兩個RDD出來,然后分別對每個RDD都執行了一個算子操作。
這種情況下,Spark需要從HDFS上兩次加載hello.txt文件的內容,并創建兩個多帶帶的RDD;//第二次加載HDFS文件以及創建RDD的性能開銷,很明顯是白白浪費掉的。
val rdd1 = sc.textFile("hdfs://master:9000/hello.txt")rdd1.map(...)val rdd2 = sc.textFile("hdfs://master:9000/hello.txt")rdd2.reduce(...)
正確的用法:
對于一份數據執行多次算子操作時,只使用一個RDD。
錯誤的做法:
有一個
接著由于業務需要,對rdd1執行了一個map操作,創建了一個rdd2,而rdd2中的數據僅僅是rdd1中的value值而已,也就是說,rdd2是rdd1的子集。
JavaPairRDD rdd1 = ...JavaRDD rdd2 = rdd1.map(...)
分別對rdd1和rdd2執行了不同的算子操作。
rdd1.reduceByKey(...)rdd2.map(...)
正確的做法:
rdd2的數據完全就是rdd1的子集而已,卻創建了兩個rdd,并對兩個rdd都執行了一次算子操作。
此時會因為對rdd1執行map算子來創建rdd2,而多執行一次算子操作,進而增加性能開銷。
其實在這種情況下完全可以復用同一個RDD。
我們可以使用rdd1,既做reduceByKey操作,也做map操作。
JavaPairRDD rdd1 = ...rdd1.reduceByKey(...)rdd1.map(tuple._2...)
正確的做法:
cache()方法表示:使用非序列化的方式將RDD中的數據全部嘗試持久化到內存中。
此時再對rdd1執行兩次算子操作時,只有在第一次執行map算子時,才會將這個rdd1從源頭處計算一次。
第二次執行reduce算子時,就會直接從內存中提取數據進行計算,不會重復計算一個rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()rdd1.map(...)rdd1.reduce(...)
序列化的方式可以減少持久化的數據對內存/磁盤的占用量,進而避免內存被持久化數據占用過多,從而發生頻繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER)rdd1.map(...)rdd1.reduce(...)
注意:通常不建議使用DISK_ONLY和后綴為_2的級別:因為完全基于磁盤文件進行數據的讀寫,會導致性能急劇降低,導致網絡較大開銷
如果有可能的話,要盡量避免使用shuffle類算子,最消耗性能的地方就是shuffle過程。
shuffle過程中,各個節點上的相同key都會先寫入本地磁盤文件中,然后其他節點需要通過網絡傳輸拉取各個節點上的磁盤文件中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致內存不夠存放,進而溢寫到磁盤文件中。因此在shuffle過程中,可能會發生大量的磁盤文件讀寫的IO操作,以及數據的網絡傳輸操作。磁盤IO和網絡數據傳輸也是shuffle性能較差的主要原因。
盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。
傳統的join操作會導致shuffle操作。
因為兩個RDD中,相同的key都需要通過網絡拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
Broadcast+map的join操作,不會導致shuffle操作。
使用Broadcast將一個數據量較小的RDD作為廣播變量。
val rdd2Data = rdd2.collect()val rdd2DataBroadcast = sc.broadcast(rdd2Data)val rdd3 = rdd1.map(rdd2DataBroadcast...)
注意:以上操作,建議僅僅在rdd2的數據量比較少(比如幾百M,或者一兩G)的情況下使用。因為每個Executor的內存中,都會駐留一份rdd2的全量數據。
如果因為業務需要,一定要使用shuffle操作,無法用map類的算子來替代,那么盡量使用可以map-side預聚合的算子,類似于MapReduce中的本地combiner。map-side預聚合之后,每個節點本地就只會有一條相同的key,因為多條相同的key都被聚合起來了。其他節點在拉取所有節點上的相同key時,就會大大減少需要拉取的數據數量,從而也就減少了磁盤IO以及網絡傳輸開銷。
建議使用reduceByKey或者aggregateByKey算子來替代掉groupByKey算子
使用reduceByKey/aggregateByKey替代groupByKey : map-side
使用mapPartitions替代普通map : 函數執行頻率
使用foreachPartitions替代foreach : 函數執行頻率
使用filter之后進行coalesce操作 : filter后對分區進行壓縮
使用repartitionAndSortWithinPartitions替代repartition與sort類操作
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子,官方建議,如果需要在repartition重分區之后,還要進行排序,建議直接使用repartitionAndSortWithinPartitions算子
有時在開發過程中,會遇到需要在算子函數中使用外部變量的場景(尤其是大變量,比如100M以上的大集合),那么此時就應該使用Spark的廣播(Broadcast)功能來提升性能。
默認情況下,Spark會將該變量復制多個副本,通過網絡傳輸到task中,此時每個task都有一個變量副本。如果變量本身比較大的話(比如100M,甚至1G),那么大量的變量副本在網絡中傳輸的性能開銷,以及在各個節點的Executor中占用過多內存導致的頻繁GC,都會極大地影響性能。
廣播后的變量,會保證每個Executor的內存中,只駐留一份變量副本,而Executor中的task執行時共享該Executor中的那份變量副本。
1)在算子函數中使用到外部變量時,該變量會被序列化后進行網絡傳輸。
2)將自定義的類型作為RDD的泛型類型時(比如JavaRDD,Student是自定義類型),所有自定義類型對象,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable接口。
3)使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的字節數組。
Spark默認使用的是Java的序列化機制,你可以使用Kryo作為序列化類庫,效率要比Java的序列化機制要高
// 創建SparkConf對象。val conf = new SparkConf().setMaster(...).setAppName(...)// 設置序列化器為KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注冊要序列化的自定義類型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
當遇到userData和events進行join時,userData比較大,而且join操作比較頻繁,這個時候,可以先將userData調用了 partitionBy()分區,可以極大提高效率。
cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等都能夠受益
總結:如果遇到一個RDD頻繁和其他RDD進行Shuffle類操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等,那么最好將該RDD通過partitionBy()操作進行預分區,這些操作在Shuffle過程中會減少Shuffle的數據量
Java中,有三種類型比較耗費內存:
1)對象,每個Java對象都有對象頭、引用等額外的信息,因此比較占用內存空間。
2)字符串,每個字符串內部都有一個字符數組以及長度等額外信息。
3)集合類型,比如HashMap、LinkedList等,因為集合類型內部通常會使用一些內部類來封裝集合元素,比如Map.Entry
Spark官方建議,在Spark編碼實現中,特別是對于算子函數中的代碼,盡量不要使用上述三種數據結構,盡量使用字符串替代對象,使用原始類型(比如Int、Long)替代字符串,使用數組替代集合類型,這樣盡可能地減少內存占用,從而降低GC頻率,提升性能。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/123982.html
摘要:正如我標題所說,簡歷被拒。看了我簡歷之后說頭條競爭激烈,我背景不夠,點到為止。。三準備面試其實從三月份投遞簡歷開始準備面試到四月份收,也不過個月的時間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學投稿的面試經歷 關注微信公眾號:進擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學的分享 目錄: 印象中的頭條 面試背景 準備面試 ...
摘要:正如我標題所說,簡歷被拒。看了我簡歷之后說頭條競爭激烈,我背景不夠,點到為止。。三準備面試其實從三月份投遞簡歷開始準備面試到四月份收,也不過個月的時間,但這都是建立在我過去一年的積累啊。 本文是 無精瘋 同學投稿的面試經歷 關注微信公眾號:進擊的java程序員K,即可獲取最新BAT面試資料一份 在此感謝 無精瘋 同學的分享目錄:印象中的頭條面試背景準備面試頭條一面(Java+項目)頭條...
摘要:創新萌芽期望最頂點下調預期至低點回歸理想生產率平臺。而大數據已從頂峰滑落,和云計算接近谷底。對于迅速成長的中國市場,大公司也意味著大數據。三家對大數據的投入都是不惜余力的。 非商業轉載請注明作譯者、出處,并保留本文的原始鏈接:http://www.ituring.com.cn/article/177529 董飛,Coursera數據工程師。曾先后在創業公司酷迅,百度基礎架構組...
閱讀 2038·2021-11-19 11:37
閱讀 727·2021-11-11 16:54
閱讀 1176·2021-11-02 14:44
閱讀 3072·2021-09-02 15:40
閱讀 2377·2019-08-30 15:44
閱讀 967·2019-08-29 11:17
閱讀 1068·2019-08-26 14:06
閱讀 1561·2019-08-26 13:47