国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Spark入門階段一之掃盲筆記

starsfun / 3073人閱讀

摘要:同時集成了機器學習類庫。基于計算框架,將的分布式計算應用到機器學習領域。提供了一個簡單的聲明方法指定機器學習任務,并且動態地選擇最優的學習算法。宣稱其性能是的多倍。

介紹

spark是分布式并行數據處理框架
與mapreduce的區別:
mapreduce通常將中間結果放在hdfs上,spark是基于內存并行大數據框架,中間結果放在內存,對于迭代數據spark效率更高,mapreduce總是消耗大量時間排序,而有些場景不需要排序,spark可以避免不必要的排序所帶來的開銷,spark是一張有向無環圖,spark支持scala,python,java等
適用范圍:
spark更適合于迭代云端比較多的ml和dm運算,因為spark里面有rdd的抽象概念,spark比hadoop更通用,spark提供的數據集操作類型有很多,不像hadoop只提供map和reduce倆種操作,比如map,filter,flatmapt,sample,groupbykey,reducebykey,union,join,cogroup,mapvalues,sort,partionby等多種操作類型,spark
把這些操作稱為transformations,同時還提供count,collect,reduce,lookup,save等多種action操作。這些多種多樣的數據集操作類型,給開發上層應用的用戶提供了方便,各個處理節點之間的通信模型不在像hadoop那樣就是唯一的data shuffle一種模式,用戶可以明明,物化,控制中間結果的存儲,分區等,可以說編程模型比hadoop更靈活。

spark是基于內存的迭代計算框架,使用與需要多次操作特定數據集的應用場合,需要反復操作的次數越多,所需要讀取的數據量越大,受益越大,數據量小但是計算密集度較大的場合,受益就相對較小. 不過由于rdd的特性,spark不適用那種一部細粒度更新狀態的應用,例如web服務的存儲或者增量的web爬蟲和索引,就是對于那種增量修改的應用模型不合適。

spark和hadoop的結合:
spark可以直接對hdfs進行數據的讀寫,同樣支持spark on yarn。spark可以與mapreduce運行于同集群中,共享存儲資源與計算,數據倉庫shark實現上借用hive,幾乎和hive完全兼容。

四種spark運行模式,local模型用于測試開發,standlone 獨立集群模式,spark on yarn spark在yarn上 ,spark on mesos spark在mesos上。

應用:
企業大數據應用: 1,count 平均值 2.分類,對比 3.趨勢,統計分析 4,精準預測 人工智能
行業大數據案例:電商,傳媒,能源,交通

spark生態系統介紹:
spark 可以很容易和yarn結合,直接調用HDFS、Hbase上面的數據,和hadoop結合。
spark核心部分分為RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了很多的大數據問題

Spark分為driver和executor,driver提交作業,executor是application早worknode上的進程,運行task,driver對應為sparkcontext。Spark的RDD操作有transformation、action。Transformation對RDD進行依賴包裝,RDD所對應的依賴都進行DAG的構建并保存,在worknode掛掉之后除了通過備份恢復還可以通過元數據對其保存的依賴再計算一次得到。當作業提交也就是調用runJob時,spark會根據RDD構建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext創建時一同初始化的,他會對作業進行調度處理。當依賴圖構建好以后,從action開始進行解析,每一個操作作為一個task,每遇到shuffle就切割成為一個taskSet,并把數據輸出到磁盤,如果不是shuffle數據還在內存中存儲。就這樣再往前推進,直到沒有算子,然后運行從前面開始,如果沒有action的算子在這里不會執行,直到遇到action為止才開始運行,這就形成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager并且提交給Executor運行,運行結束后反饋給DAGScheduler完成一個taskSet,之后再提交下一個,當TaskSet運行失敗時就返回DAGScheduler并重新再次創建。一個job里面可能有多個TaskSet,一個application可能包含多個job。

1、shark介紹:
shark基本上就是spark的框架基礎上提供和hive一樣的hivesql命令接口,為了最大程度的保持和hive的兼容性,shark使用hive的api來實現query parsing和logic plan generation,最后的physicalplan execution階段用spark代替hadoop mapreduce,用過配置shark參數,shark可以自動在內存中緩存特定的rdd,實現數據重用,進而加快特定數據集的檢索,同時,shark通過udf用戶自定義函數實現特定的數據分析學習算法,使得sql數據查詢和運算分析能結合在一起,最大化rdd的重復使用。

2、spark streaming介紹:
Spark Streaming 是 Spark 提供的對實時數據進行流式計算的組件,一般與kafka結合,基本的原理是將stream數據分成小的時間片段,以類似batch批量處理的方式來處理這些小部分數據。spark streaming構建在spark上,一方面是因為spark的低延遲執行引擎可以用于實時計算,此外小批量的處理方式使得他可以同時兼容批量和實時數據處理的邏輯和算法,方便了一些需要歷史數據和實時數據聯合分析的特定應用場景。
Spark Streaming也有一個StreamingContext,其核心是DStream,是通過以組時間序列上的連續RDD來組成的,包含一個有Time作為key、RDD作為value的結構體,每一個RDD都包含特定時間間隔的數據流,可以通過persist將其持久化。在接受不斷的數據流后,在blockGenerator中維護一個隊列,將流數據放到隊列中,等處理時間間隔到來后將其中的所有數據合并成為一個RDD(這一間隔中的數據)。其作業提交和spark相似,只不過在提交時拿到DStream內部的RDD并產生Job提交,RDD在action觸發之后,將job提交給jobManager中的JobQueue,又jobScheduler調度,JobScheduler將job提交到spark的job調度器,然后將job轉換成為大量的任務分發給spark集群執行。

3、Graphx
主要用于圖的計算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。

4、Spark SQL
是Spark新推出的交互式大數據SQL技術。把sql語句翻譯成Spark上的RDD操作可以支持Hive、Json等類型的數據。

5、Spark R
通過R語言調用spark,目前不會擁有像Scala或者java那樣廣泛的API,Spark通過RDD類提供Spark API,并且允許用戶使用R交互式方式在集群中運行任務。同時集成了MLlib機器學習類庫。

6、MLBase
從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發者)、Spark。也可以直接使用MLlib。ML Optimizer,一個優化機器學習選擇更合適的算法和相關參數的模塊,還有MLI進行特征抽取和高級ML編程 抽象算法實現API平臺,MLlib分布式機器學習庫,可以不斷擴充算法。MLRuntime基于spark計算框架,將Spark的分布式計算應用到機器學習領域。MLBase提供了一個簡單的聲明方法指定機器學習任務,并且動態地選擇最優的學習算法。

7、Tachyon
高容錯的分布式文件系統。宣稱其性能是HDFS的3000多倍。有類似java的接口,也實現了HDFS接口,所以Spark和MR程序不需要任何的修改就可以運行。目前支持HDFS、S3等。

什么是rdd:

rdd是spark最基本,也是最根本的數據抽象,RDD表示分布在多個計算節點上的可以并行操作的元素集合,rdd是只讀的,分區記錄的集合。
rdd支持兩種操作,1,轉換從現有的數據集創建一個新的數據集,2,動作 在數據集上運行計算后,返回一個值給驅動程序,例如,map就是一種轉換,他將數據集每一個元素都傳遞給函數,并返回一個新的分布數據集表示結果,另一個方面,reduce是一個動作,通過一些函數將所有的元組疊加起來,并將結果返回給driver程序,spark中的所有轉換都有惰性的,也就是說,他們并不會直接計算結果,相反的,他們只是記住應用哦個到基礎數據集上的這些轉換動作,例如,我們可以實現,通過map創建的一個新數據集,并在reduce使用,最終只返回reduce的結果給driver,而不是整個大的新數據集。默認情況下,每個轉換過的rdd都會在你在他之上執行一個動作時被重新計算,不過,你也可以使用persist方法,持久話一個rdd在內存中,在這種情況下,spark將會在集群中,保存相關元素,下次你查詢這個rdd是,他將能更快訪問,在磁盤上持久化數據集,或在集群間賦值數據集也是支持的。除了這些操作外,用戶還可以請求將rdd緩存起來,而且,用戶還可以通過partitioner類獲取rdd的分區順序,然后將另一個rdd按照同樣的方式分區。

如何操作rdd?
1、如何獲取rdd 1,從共享的文件系統獲取,hdfs,2.通過已存在的rdd轉換 3.將已存在的scala集合并行化,通過調用sparkcontext的parallelize方法實現 4.改變現有rdd的之久性,rdd是懶散,短暫的
2、操作rdd的倆個動作,1,actions:對數據集計算后返回一個數值value給驅動程序,例如redue將數據集的所有元素用某個函數聚合后,將最終結果返回給程序,2.transformation 根據數據集創建一個新的數據集,計算后返回一個新rdd;例如map將數據的每個元素講過某個函數計算后,返回一個姓的分布式數據集。

actions具體內容:

reduce(func)通過函數func聚集數據集中所有元素,func函數接受2個參數,返回一個值,這個函數必須是關聯性的,確保可以被正確的并發執行。

collect() 在driver的程序中,以數組的形式,返回數據集的所有元素,這通常會在使用filter或者其他操作后,返回一個縱溝小的數據自己在使用,直接將整個rdd集coloect返回,很可能會讓driver程序oom。

count() 返回數據集的元素個數

take(n) 返回一個數組,用數據集的前n個元素組成,注意,這個操作目前并非在多個節點上,并行執行,而是driver程序所在機制,單機計算所有的元素:注;gateway的內存壓力會增大,需要謹慎使用

first()返回數據集的第一個元素

saveAsTextFile(path) 將數據集的元素,以txtfile的形式,保存到本地文件系統,hdfs或者其他hadoop支持的文件系統,spark將會調用每個元素的tostring方法,并將他轉換成文件中一行文本。

saveAsSequenceFile(path)將數據集的元素,以sequencefile的格式,到指定的目錄下,本地系統,hdfs或者其他hadoop支持的文件系統,rdd的元組必須有key-value對組成,并都實現了hadoop的- writable接口或隱式可以轉換為wirtable

foreach(func)在數據集的每個元素上,運行函數func,這通常用于更新一個累加器變量,或者和外部存儲系統做交互。直接使用 rdd.foreach(println) 在local模式下是可行的,但是在cluster模式下是不行的,必須要執行collect()方法,將所有的數據拉取到本地,然后執行foreach()操作。如果是數據量比較小的話可以使用take方法,rdd.take(100).foreach(println)

transformation具體內容:

map(func) 返回一個新的分布式數據集,有每個原元素經過func函數轉換后組成

filter(func) 返回一個新的數據集,有經過func函數后返回值為true的原元素組成

flatmap(func)類似于map 但是每一個輸入元素,會被映射0到多個輸出元素,因此func函數的返回值是一個seq,而不是單一元素

sample(withReplacement,frac,seed) 給定的隨機種子seed,隨機抽樣出數量為frac的數據

union(otherdataset)返回一個新的數據集,由原數據集和參數聯合而成

intersection : 只返回兩個RDD中都有的元素,intersecton()在運行時會去掉所有重復的元素(單個RDD內重復元素也會一起移除)。 需要通過網絡混洗來發現共有數據。

distinct : 生成一個只包含不同元素的新RDD。需要注意:distinct() 操作的開銷很大,因為它需要將所有數據通過網絡進行混洗(shuffle),以確保每個元素只有一份。

subtract : 接受另一個RDD作為參數,返回一個由只存在在第一個RDD而不存在第二個RDD中的所有元素組成的RDD。 需要數據混洗。

cartesian : 返回所有可能的(a,b)對,其中a是源RDD中的元素,b是另一個RDD中的元素。

groupbykey(【num tasks】)在一個有kv對組成的數據集上調用,返回一個k,seq【v】對的數據集,注意,默認情況下,使用8個并行任務進行分組,你可以傳入num task可選參數,根絕數據量設置不同數目的task

reducebykey(func,【num tasks】)在一個kv對的數據集上使用,返回一個kv的數據集,key相同的值都被使用指定的reduce函數聚合在一起,和groupbykey類似,任務個數是第二個參數來配置

join(otherdataset,【num tasks】)在類型kev和kw類型的數據集上調用,返回一個k(v w)對,每個key中所有元素都在一起的數據集

groupwith(otherdataset,【num tasks】)在類型為kv和kw類型的數據集上調用,返回一個數據集,組成元組為k seq【v】seq[w]tuples ,這個在其他框架稱為cogroup

cartesian(otherdataset) 笛卡兒積,但在數據集t和u調用是,返回一個tu對的數據集,所有元素交互進行笛卡兒積。

持久化(緩存)

persist()

cache()

基本開發思路

每個saprk應用都有一個驅動器程序來發起集群上的各種并行操作。驅動器程序通過一個SparkContext對象來訪問Spark。這個對象代表對計算集群的一個連接。
一旦有了SparkContext,你就可以用它來創建RDD。要執行這些操作,啟動器程序一般要管理多個執行器(executor)節點。
可以先通過SparkConf對象來配置你的應用,然后基于這個SparkConf創建一個SparkContext對象。
創建SparkConf的基本方法,傳遞兩個參數:
1、集群URL:告訴Spark如何連接到集群上。
2、應用名:當連接到一個集群式,這個值可以幫助你在集群管理器的用戶界面中找到你的應用。

關閉Spark:調用SparkContext的stop()方法。或直接退出應用。(system.exit(0)/sys.exit())
在Spark中,對數據的所有操作不外乎是: 創建RDD、 轉化已有的RDD、調用RDD操作進行求值
Spark中的RDD是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。
當我們調用一個新的行動操作時,整個RDD都會從頭開始計算。要避免這種行為,用戶可以將中間結果持久化。

demo(Python版)

1、初始化sparkcontext

from pyspark import SparkConf, SparkContxt
conf = SparkConf().setMaster("local").setAppName("my app")
sc = SparkContext(conf=conf)

# 關閉連接
sc.stop()

2、RDD編程

# 從文件讀取數據
line = sc.textFile("README.md")
# parallelize 方法
line = sc.parallelize(["pandas","i like pandas"])


inputRDD = sc.textFile("log.txt")
errRDD = inputRDD.filter(lambda x:"error" in x)
warnRDD = inputRDD.filter(lambda x:"warning" in x)
bindRDD = errRDD.union(warnRDD)


bindRDD.count()
bindRDD.take(10)
# 返回全部數據集
bindRDD.collect()


# lambda 函數
word = rdd.filter(lambda s:"python" in s)
# def 定義的函數
def containsErr(s):
    return "error" in s
word = rdd.filter(containsErr)

2.1、RDD常見轉換操作
以 rdd={1,2,3,3} 為例的轉換操作

# 將函數應用與RDD中的每個元素,將返回值構建新的RDD
rdd.map(x => x+1)

# 將函數應用用RDD中的每個元素,將返回的迭代器的所有內容構成新的RDD。通常用于切分單詞。
rdd.flatMap(x=>x.to(3))  --> {1,2,3,2,3,3,3})

# 返回一個由通過傳給filter()的函數的元素組成的RDD
rdd.filter(x=>x!=1)  -->  {2,3,4}

# 去重
rdd.distinct()  -->  {1,2,3}

sample(withReplacement,fraction,[seed])
# 對RDD進行采樣,以及是否替換
rdd.sample(false,0.5)   -->  非確定的

以{1,2,3}和{3,4,5}的RDD轉換操作

# 求并集
rdd.union(other)  --> {1,2,3,4,5}

# 求交集
rdd.intersection(other)  --> {3}

# 移除一個RDD中的內容,相當于減去一個交集
rdd.subtract(other)  -->  {1,2}

# 與另一個RDD的笛卡爾積
rdd.cartesian(other)  --> {(1,3),(1,4)...(3,5)}

2.2、RDD常見行動操作
以{1,2,3,3}為列說明常見行動操作

# 返回RDD中所有的元素
rdd.collect()  --> {1,2,3,3}

# 計數
rdd.count()

# 各元素在RDD中出現的次數
rdd.countByValue()   --> {(1,1),(2,1),(3,2)}

take(num)
# 返回前n元素

top(n)
# 排序后的前n個元素

# 按照指定順序,從rdd中返回前n個元素
rdd.takeOrdered(2)(myOrdering)   --> {3,3}

takeSample(withReplacement,num,[seed])
# 從RDD中返回任意一些元素
rdd.takeSample(false,1)  --> 非確定的

# 并行整合rdd中所有的數據,比如sum
rdd.reduce((x,y)=>x+y)  --> 9

fold(zero)(func)
# 和reduce()一樣,但是需要提供初始值
rdd.fold(0)((x,y)=>x+y) --> 9

aggregate(zeroValue)(seqOp,combOp)
# 和reduce類似,但是通常返回不同類型的函數
aggregate((0,0))((x,y)=>(x._1+y,x._2+1),
                 (x,y)=>(x._1+y._1,x._2+y._2) )  --> 9

# 對RDD中的每個元素使用給定的函數
rdd.foreach(func)

2.3、持久化緩存

from pyspark.storage import StorageLvel
rdd.presist(StoragLevel.DISK_ONLY)

RDD.cache()

# 緩存的級別
# MEMORY_ONLY
# MEMORY_ONLY_SER
# MEMORY_AND_DISK  # 如果內存放不下,則溢出寫到磁盤上
# MEMORY_AND_DISK_SER  # 如果內存放不下,則溢出寫到磁盤上,在內存中存放序列化后的數據
# DISK_ONLY

# 移除緩存
RDD.unpersist()

3、鍵值對操作

# 以{(1,2),(3,4),(3,6)}為例

# 合并具有形同鍵的值
rdd.reduceByKey((x,y)=>x+y)  -->{(1,2),(3,10)}

# 對具有相同鍵的值分組
rdd.groupByKey()  --->{(1,[2]),(3,[4,6])}

combineByKey(createCombiner,mergeValue,mergeComBiners,partitioner)
# 使用不同的返回類型合并具有相同鍵的值。有多個參數分別對應聚合操作的各個階段,因而非常適合用來解釋聚合操作各個階段的功能劃分。
# 下面是求每個鍵的平均值
sumCount=num.combineByKey((lambda x:(x,1)),
                           (lambda x,y:(x[0]+y,x[1]+1)),
                           (lambda x,y:(x[0+y[0],x[1]+y[1]])))
sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAaMap()

# 對pairRDD的每個值應用一個函數而不改變鍵
rdd.mapVlues(x=>x+1)

# 對pairRDD的每個值應用一個返回迭代器的函數,然后對返回的每個元素都生成一個對應原鍵的鍵值對記錄,通常用于符號化
rdd.flatMapValues(x=>(x to 5))  -->{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

# 返回一個僅含有鍵的RDD
rdd.keys()   ->{1,3,3}

# 返回一個僅包含值的RDD
rdd.values()   -->{2,4,6}

# 返回一個根據鍵排序的RDD
rdd.sortByKey(ascending=True)  -->{(1,2),(3,4),(3,6)}

3.1、兩個鍵值對RDD的轉換操作

# 以rdd={(1,2),(3,4),(3,6)} other={(3,9)} 為例

# 刪除rdd中鍵與other中鍵相同的元素
rdd.subtracByKey(other)  --> {(1,2)}

# 對兩個rdd內鏈接
rdd.join(other)   --> {(3,(4,9)),(3,(6,9))}

# 對兩個rdd進行連接操作,確保第一個rdd中的鍵必須存在(右外鏈接)
rdd.rightOuterJoin(other)   --> {(3,(some(4),9)),(3,(some(6),9))}

# 對兩個rdd進行連接操作,確保第二個rdd中的鍵必須存在(左外連接)
rdd.leftOuterJoin(other)  --> {(1,(2,None)),(3,(4,some(9))),(3,(6,some(9)))}

# 將兩個rdd中擁有相同鍵的數據分組到一起
rdd.congroup(other)  --> {(1,([2],[])),(3,([4,6],[9]))}

3.2、鍵值對Pair RDD的行動操作

# 以 rdd={(1,2),(3,4),(3,6)} 為例

# 對每個鍵對應的元素分別計數
rdd.countByKey()  --> {(1,1,),(3,2)})

# 將結果以映射表的形式返回,以便查詢
rdd.collectAsMap()  --> Map{(1,2),(2,6)}

# 返回給定鍵對應的所有值
rdd.lookup(3)   --> [4,6]

4、并行度調優
每個rdd都有固定數目的分區,分區數決定了在rdd上執行操作的并行度。 大多數操作符都能接受第二個參數,用來指定分組結果或者聚合結果的rdd的分區數。
比如 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10) 指定分區數10
查看分區數 rdd.partitions.size或rdd.getNumPartitions ,改變分區的方法repartition()

5、數據讀取與保存
讀取txt文件,輸入的每一行都會成為RDD的一個元素。

# 讀取文件
input=sc.textFile("file:///home/holden/README.md")
# 保存文件
result.saveAsTextFile(outputFile)

讀取json

# 將json文件的每一行假設為一條記錄來處理
import json
data = input.map(lambda x:json.load(x))
# 寫
(data.filter(lambda x:x[lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile))

讀取csv,同樣是將讀取的文本的每一行當做一條記錄

import csv
from io import StringIO
def loadRecord(line):
    """解析一行csv記錄"""
    input = StringIO(line)
    reader = csv.DictReader(input,filednames=["name","favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

# 保存csv
def writeRecords(records):
    """寫出一些csv記錄"""
    output = StringIO()
    writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

讀取SequenceFile
Hadoop輸入輸出格式
關系型數據庫
HBase

6、Spark進階編程
6.1、兩種類型的共享變量

累加器(qccumulator):用于對信息聚合,提供了將工作節點中的值聚合到驅動器程序中的簡單語法。

廣播變量(broadcast variable):用來高效分發較大的對象,讓程序高效地向所有工作節點發送一個較大的值,以供一個或多個spark操作使用。

# 在python中累加空行,使用了累加器
file = sc.textFile(inputFile)
# 創建累加器并初始化為0
blankLine=sc.accumulator(0)

def extractCallSigs(line):
    global blankLine  # 訪問全局變量
    if (line==""):
        blankLine+=1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(output)


# 使用廣播變量查詢國家
# 查詢rdd中呼叫號對應的位置,將呼號前綴讀取為國家代碼來進行查詢
signPrefixes = sc.broadcast(loadCallSignTable())   # 廣播變量

def processSignCount(sign_count,signPrefixes):
    country=lookupCountry(sign_count[0],signPrefixes.value)
    count = sign_count[1]
    return (country,count)

countryContactCounts=(contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
countryContactCounts.saveAsTextFile(output)

基于分區進行操作
spark提供基于分區的map和foreach,使部分代碼只對rdd的每個分區運行一次,可以幫助降低這些操作的代價。

# 按照分區執行的操作符

mapPartitions()
# 參數:該分區中元素的迭代器。返回:元素的迭代器
# 對于RRD[T]的函數簽名 :f:(iterator[T])  --> iterator[U]

mapPartitionsWithIndex()
# 參數:分區序號,以及每個分區中的元素的迭代器。返回:元素的迭代器
# 對于RRD[T]的函數簽名 :f:(int,iterator[T])  --> iterator[U]

foreachPartitions()
# 參數:元素迭代器。返回:無
# 對于RRD[T]的函數簽名 :f:(iterator(T))  -->Unit

數值RDD的操作

count()
# RDD中元素個數
mean()
# 元素平均值
sum()
# 
max()
min()
variance()  # 方差
sampleVariance()  # 從采樣中計算出的方差
stdev()  # 標準差
sampleStdev()   # 采用的標準差

7、基于MLlib的機器學習

# 邏輯回歸的垃圾郵件分類
from pyspark.mllib.regression import LabeldPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD

spam=sc.textFile("spam.txt")
normal = sc.textFile("normal.txt")

# 創建一個HashingTF實例來把郵件文本映射為包含10000個特征的向量
tf=HashingTF(numFeatures=10000)
# 各郵件都切分為單詞,每個單詞映射為一個特征
spamFeatures = spam.map(lambda email: tf.transForm(email.split(" ")))
normalFeatures = normal.map(lambda email: tf.transform(email.split(" ")))

# 創建LabelPoint數據集分別存放陽性(垃圾郵件)和陰性(正常郵件)的例子
positiveExample = spamFeatures.map(lambda features:LabeldPoint(1,features))
negativeExamples = normalFeatures.map(lambda features:labeldPoint(0,features))
trainingData = positiveExample.union(negativeExample)
trainingData.cache()  # 因為邏輯回歸是迭代算法,所以需要緩存訓練數據RDD

# 使用SGD算法
model = LogisticRegressionWithSGD.train(trainningData)

# 以陽性和陰性的例子分別測試。
# 首先用一樣的HashingTF特征來得到特征向量,然后對該向量應用得到的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(" "))
negTest = tf.transform("Hi Dad, i started studying spark the other ...".split(" "))
print( "predict for postive test example:%g" % model.predict(posTest))
print( "predict for negative test example:%g" % model.predict(negTest))

MLlib包含一些特有的數據類型,對于Scala和Java,它們位于org.apache.spark.mllib下,對于Python則是位于pyspark.mllib下。

入門:

spark有兩個重要的抽象:

RDD,分布式彈性數據集,他是一個跨越多個節點的分布式集合。

另一個抽象是共享變量。spark支持兩種類型的共享變量:一個是廣播(broadcast variables)他可以緩存一個值在集群的各個節點。另一個是累加器(accumulators)他只能執行累加的操作,比如可以做計數器和求和。

初始化 Spark
在一個Spark程序中要做的第一件事就是創建一個SparkContext對象來告訴Spark如何連接一個集群。為了創建SparkContext,你首先需要創建一個SparkConf對象,這個對象會包含你的應用的一些相關信息。這個通常是通過下面的構造器來實現的:
new SparkContext(master, appName, [sparkHome], [jars])
參數說明:

master:用于指定所連接的 Spark 或者 Mesos 集群的 URL。

appName :應用的名稱,將會在集群的 Web 監控 UI 中顯示。

sparkHome:可選,你的集群機器上 Spark 的安裝路徑(所有機器上路徑必須一致)。

jars:可選,在本地機器上的 JAR 文件列表,其中包括你應用的代碼以及任何的依賴,Spark 將會把他們部署到所有的集群結點上。
在 python 中初始化,示例代碼如下:

//conf = SparkContext("local", "Hello Spark")
conf = SparkConf().setAppName("Hello Spark").setMaster("local")
sc = SparkContext(conf=conf)

說明:如果部署到集群,在分布式模式下運行,最后兩個參數是必須的,第一個參數可以是以下任一種形式:
Master URL 含義

local 默認值,使用一個 Worker 線程本地化運行(完全不并行)

local[N] 使用 N 個 Worker 線程本地化運行,N 為 * 時,表示使用系統中所有核

local[N,M] 第一個代表的是用到的核個數;第二個參數代表的是容許該作業失敗M次

spark://HOST:PORT 連接到指定的 Spark 單機版集群 master 進程所在的主機和端口

mesos://HOST:PORT 連接到指定的 Mesos 集群。host 參數是Moses master的hostname。端口默認是5050
如果你在一個集群上運行 spark-shell,則 master 參數默認為 local。在實際使用中,當你在集群中運行你的程序,你一般不會把 master 參數寫死在代碼中,而是通過用 spark-submit 運行程序來獲得這個參數。但是,在本地測試以及單元測試時,你仍需要自行傳入 local 來運行Spark程序。

運行代碼有幾種方式,一是通過 spark-shell 來運行 scala 代碼,一是編寫 java 代碼并打成包以 spark on yarn 方式運行,還有一種是通過 PySpark 來運行 python 代碼。

在 spark-shell 和 PySpark 命令行中,一個特殊的集成在解釋器里的 SparkContext 變量已經建立好了,變量名叫做 sc,創建你自己的 SparkContext 不會起作用。


 
    org.apache.spark
    spark-core_2.10
    2.1.1
  
  
    junit
    junit
    4.12
    test
  

創建一個簡單的spark程序:

public class SimpleApp {
  public static void main(String[] args) {
      // 文件路徑
      String logFile = "/home/wm/apps/spark-1.4.0-bin-hadoop2.6/README.md";
      SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD logData = sc.textFile(logFile).cache();
      @SuppressWarnings("serial")
      long numAs = logData.filter(new Function() {
          public Boolean call(String s) throws Exception {
              return s.contains("a");
          }

      }).count();
      @SuppressWarnings("serial")
      long numBs = logData.filter(new Function() {

          public Boolean call(String s) throws Exception {
              return s.contains("b");
          }

      }).count();
      System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
      sc.close();
  }
}

Spark的核心就是圍繞著RDD,它是一個自動容錯的分布式數據集合。他有兩種方式創建,第一種就是在驅動程序中對一個集合進行并行化。第二種是來源于一個外部的存儲系統。比如:共享系統、HDFS、HBase或者任何提供任何Hadoop 輸入格式的數據源。

第一種:Parallelized Collections 創建這個集合需要調用那個JavaSparkContext的parallelize方法來初始化一個已經存在的集合。

List data = Arrays.asList(1,2,3,4,5);
JavaRDD distData = sc.parallelize(data);

這就創建了一個并行的集合,在這個集合上可以執行 distData.reduce((a, b) -> a + b)
在并行數組中一個很重要的參數是partitions,它來描述數組被切割的數據集數量。Spark會在每一個partitions上運行任務,這個partitions會被spark自動設置,一般都是集群中每個CPU上運行2-4partitions,但是也可以自己設置,可以通過parallelize (e.g. sc.parallelize(data, 10)),在有些地方把partitions成為 slices。

第二種:External Datasets

JavaRDD distFile = sc.textFile("data.txt");

textFile也可以設置partitions參數,一般都是一個block一個partitions,但是也可以自己設置,自己設置必須要不能少于block的數量。
針對Hadoop的其他輸入格式,你能用這個JavaSparkContext.hadoopRDD方法,你需要設置JobConf和輸入格式的類。也可以使用JavaSparkContext.newAPIHadoopRDD針對輸入格式是基于“new”的MapReduceAPI

demo(python) 分析 Nginx 日志中狀態碼出現次數

先將測試數據上傳到 hdfs:
$ hadoop fs -put access.log
然后,編寫一個 python 文件,保存為 SimpleApp.py:

from pyspark import SparkContext

logFile = "access.log"

sc = SparkContext("local", "Simple App")

rdd = sc.textFile(logFile).cache()

counts = rdd.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) 

# This is just a demo on how to bring all the sorted data back to a single node.  
# In reality, we wouldn"t want to collect all the data to the driver node.
output = counts.collect()  
for (word, count) in output:  
    print "%s: %i" % (word, count)  

counts.saveAsTextFile("/data/result")

sc.stop()

接下來,運行下面代碼:

$ spark-submit  --master local[4]   SimpleApp.py
demo(java) 統計單詞出現次數
JavaRDD lines = sc.textFile("data.txt");
JavaPairRDD pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD counts = pairs.reduceByKey((a, b) -> a + b);
demo (java) 讀取HDFS中的數據,并簡單分析,最后結果寫入mysql數據庫中。
 
    org.apache.spark
    spark-core_2.10
    2.11


    mysql
    mysql-connector-java
    5.1.13


    org.apache.hadoop
    hadoop-client
    2.6.0


    junit
    junit
    4.12
    test

由于需要讀取HDFS中的數據,所以需要hadoop-client文件

在main函數中首先創建JavaSparkcontext對象。

SparkConf conf = new SparkConf().setAppName("FindError");
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 
* 列出指定目錄中的文件,這里的文件是不包括子目錄的。
* @param pathOfDirectory
*     目錄路徑
* @return
* @throws IOException 
*/
public static String[] findFilePathFromDir(String dst) throws IOException {
  Set filePathSet = new HashSet();
  String[] result = null;
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(URI.create(dst), conf);
  FileStatus fileList[] = fs.listStatus(new Path(dst));
  int size = fileList.length;
  for (int i = 0; i < size; i++) {
      filePathSet.add(fileList[i].getPath().toString());
  }
  if (filePathSet.size() > 0) {
      result = new String[filePathSet.size()];
      int i = 0;
      for (String str : filePathSet) {
          result[i++] = str;
      }
  }
  fs.close();
  return result;
}

依次遍歷文件路徑并為每個文件創建一個新的RDD然后計算出這個文件中包涵ERROR字符串的行數。

Map result = new HashMap();
if (filePaths != null) {
  for (String path : filePaths) {
      result.put(path, sc.textFile(path).filter(new Function() {

          public Boolean call(String line) throws Exception {
              return line.contains("ERROR");
          }

      }).count());
  }
}

將results中的數據寫入mysql中

/**
* 將結果寫入mysql中
* @param result
* @throws Exception 
*/
public static void wirteResultToMysql(Map result) throws Exception {
  String DBDRIVER = "com.mysql.jdbc.Driver";  
  //連接地址是由各個數據庫生產商多帶帶提供的,所以需要多帶帶記住  
  String DBURL = "jdbc:mysql://ip:3306/test";  
  //連接數據庫的用戶名  
  String DBUSER = "root";  
  //連接數據庫的密碼  
  String DBPASS = "root";
  Connection con = null; //表示數據庫的連接對象  
  PreparedStatement pstmt = null; //表示數據庫更新操作  
  String sql = "insert into aaa values(?,?)";  
  Class.forName(DBDRIVER); //1、使用CLASS 類加載驅動程序  
  con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //2、連接數據庫  
  pstmt = con.prepareStatement(sql); //使用預處理的方式創建對象  
  if (result != null) {
      for (String str : result.keySet()) {
          pstmt.setString(1, str);
          pstmt.setLong(2, result.get(str));
          pstmt.addBatch();
      }
  }
  //pstmt.executeUpdate(); //執行SQL 語句,更新數據庫  
  pstmt.executeBatch();
  pstmt.close();  
  con.close(); // 4、關閉數據庫  
}
共享變量

通常情況下,當一個函數傳遞給一個在遠程集群節點上運行的Spark操作(比如map和reduce)時,Spark會對涉及到的變量的所有副本執行這個函數。這些變量會被復制到每個機器上,而且這個過程不會被反饋給驅動程序。通常情況下,在任務之間讀寫共享變量是很低效的。但是,Spark仍然提供了有限的兩種共享變量類型用于常見的使用場景:廣播變量和累加器。
1、廣播變量
廣播變量允許程序員在每臺機器上保持一個只讀變量的緩存而不是將一個變量的拷貝傳遞給各個任務。它們可以被使用,比如,給每一個節點傳遞一份大輸入數據集的拷貝是很低效的。Spark 試圖使用高效的廣播算法來分布廣播變量,以此來降低通信花銷。 可以通過 SparkContext.broadcast(v) 來從變量 v 創建一個廣播變量。這個廣播變量是 v 的一個包裝,同時它的值可以功過調用 value 方法來獲得。以下的代碼展示了這一點:

broadcastVar = sc.broadcast([1, 2, 3])


>>> broadcastVar.value
[1, 2, 3]

在廣播變量被創建之后,在所有函數中都應當使用它來代替原來的變量v,這樣就可以保證v在節點之間只被傳遞一次。另外,v變量在被廣播之后不應該再被修改了,這樣可以確保每一個節點上儲存的廣播變量的一致性(如果這個變量后來又被傳輸給一個新的節點)。

2、累加器
累加器是在一個相關過程中只能被”累加”的變量,對這個變量的操作可以有效地被并行化。它們可以被用于實現計數器(就像在MapReduce過程中)或求和運算。Spark原生支持對數字類型的累加器,程序員也可以為其他新的類型添加支持。累加器被以一個名字創建之后,會在Spark的UI中顯示出來。這有助于了解計算的累進過程(注意:目前Python中不支持這個特性)。

可以通過SparkContext.accumulator(v)來從變量v創建一個累加器。在集群中運行的任務隨后可以使用add方法或+=操作符(在Scala和Python中)來向這個累加器中累加值。但是,他們不能讀取累加器中的值。只有驅動程序可以讀取累加器中的值,通過累加器的value方法。

以下的代碼展示了向一個累加器中累加數組元素的過程:

accum = sc.accumulator(0)
Accumulator

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10

這段代碼利用了累加器對 int 類型的內建支持,程序員可以通過繼承 AccumulatorParam 類來創建自己想要的類型支持。AccumulatorParam 的接口提供了兩個方法:zero用于為你的數據類型提供零值;addInPlace 用于計算兩個值得和。比如,假設我們有一個 Vector類表示數學中的向量,我們可以這樣寫:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

累加器的更新操作只會被運行一次,Spark 提供了保證,每個任務中對累加器的更新操作都只會被運行一次。比如,重啟一個任務不會再次更新累加器。在轉化過程中,用戶應該留意每個任務的更新操作在任務或作業重新運算時是否被執行了超過一次。

累加器不會改變Spark 的惰性求值模型。如果累加器在對RDD的操作中被更新了,它們的值只會在啟動操作中作為 RDD 計算過程中的一部分被更新。所以,在一個懶惰的轉化操作中調用累加器的更新,并沒法保證會被及時運行。 下面的代碼段展示了這一點:

accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
// 這里,accum任然是0,因為沒有action算子,所以map也不會進行實際的計算
任務的提交以及Standalone集群模式的部署

參考官方文檔:http://spark.apache.org/docs/...
spark-submit
首先需要打包代碼,如果你的代碼需要依賴其他的包環境則需要多帶帶的打包這些依賴,應為cluster會將所有依賴的jar包分發到各個節點上進行使用。推薦的方法是將依賴包和程序都統一的打成一個包,這樣就可以直接使用spark-submit方法來運行,具體的pom.xml配置如下:


     
        org.apache.spark
        spark-core_2.10
        2.11
        provided
    
    
        mysql
        mysql-connector-java
        5.1.13
    
    
        org.apache.hadoop
        hadoop-client
        2.6.0
        provided
    
    
        junit
        junit
        4.11
        test
    


    
        
            org.apache.maven.plugins
            maven-compiler-plugin
            2.3.2
            
                
                1.7
                1.7
            
        
        
            maven-assembly-plugin
            2.5.5
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    

spark && hadoop 的scope值都設置為provided
在服務器上提交的命令如下:

./bin/spark-submit 
  --class 
  --master  
  --deploy-mode  
  --conf = 
  ... # other options
   
  [application-arguments]

spark-submit 可以加載一個配置文件,默認是加載在conf/spark-defaults.conf

單元測試

Spark對所有常見的單元測試框架提供友好的支持。你只需要在測試中創建一個SparkContext對象,然后吧master URL設為local,運行測試操作,最后調用 SparkContext.stop() 來停止測試。注意,一定要在 finally 代碼塊或者單元測試框架的 tearDown方法里調用SparkContext.stop(),因為Spark不支持同一程序中有多個SparkContext對象同時運行。

部署

1、Spark Standalone Mode
除了運行在Mesos和YARN集群之外,spark也提供了簡單的獨立部署模式。可以通過手動的啟動master和worker,也可以通過spark提供的啟動腳本來啟動。獨立部署也可以通過運行在一個機器上,進行測試。
為了安裝你需要放置一個編譯好的spark版本到每個機器上。
啟動集群有兩種方式,一種是手動啟動,另一種是通過啟動腳本啟動。
1.1、手動啟動spark集群
啟動一個獨立的master可以使用如下的命令:
./sbin/start-master.sh
一旦啟動可以通過訪問:http://localhost:8080端口訪問master
可以使用如下的命令來使worker節點連接到master上:
./sbin/start-slave.sh
worker在加入到master后可以訪問master的http://localhost:8080,可以看到被加入的worker節點的信息。
在啟動master和worker的時候可以帶上參數進行設置,參數的列表如下:其中比較重要的是:
-c CORES, 這個是指定多少個cpu分配給spark使用,默認是全部cpu
-m MEM,這個是指定多少的內存分配給spark使用,默認是全部的內存的減去1g的操作系統內存全部分配給spark使用。一般的格式是1000M or 2G
-d DIR, 這個指定spark任務的日志輸出目錄。
–properties-file FILE 指定spark指定加載的配置文件的路徑默認是: conf/spark-defaults.conf

1.2、腳本方式部署
通過spark的部署腳本部署首先需要在spark的主目錄下創建一個conf/slaves的文件,這個文件中每一行代表一個worker的hostname.需要注意的是,master訪問worker節點是通過SSH訪問的,所以需要master通過ssh無密碼的登錄到worker,否則需要設置一個 SPARK_SSH_FOREGROUND的環境變量,這個變量的值就是每個worker的密碼

然后可以通過spark安裝目錄下的sbin/….sh文件進行啟動, 如果要啟動和停止master和slave可以使用:
sbin/start-all.sh
sbin/stop-all.sh
注意的是這些腳本必須是在master機器上執行

同時你可以通過配置集群的 conf/spark-env.sh文件來進一步配置集群的環境。但是這也文件需要通過拷貝conf/spark-env.sh.template文件來創建,并且需要把這個文件拷貝到所有的worker節點上。
其中: SPARK_MASTER_OPTS && SPARK_WORKER_OPTS 兩個配置項比較復雜。
通過在SparkContext構造器中傳入spark://IP:PORT這個來啟用這個集群。同時可以在交互式的方式啟動腳本中使用:./bin/spark-shell –master spark://IP:PORT 來啟動集群執行。
獨立部署模式的集群現在只是簡單的支持FIFO調度。 為了允許多個并發用戶,可以通過SparkConf設置每個應用程序需要的資源的最大數。默認情況下,它會請求使用集群的全部的核,而這只是同時運行一個應用程序才回有意義。

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

除了可以在程序中指定你也可以在spark-env.sh中設置默認的值,export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores="

2、spark的高可用設置
spark的高可用設置有兩種,一種是通過Zookeeper來實現,另一種是通過本地文件系統來實現。

2.1、使用ZooKeeper備份master,利用zookeeper提供的領導選舉和狀態保存,你可以讓更多的master連接到zookeepre實例。一個將會被選舉為leader其他的則會保存備份他的狀態。如果master死掉,zookeeper可以選舉一個新的leader,整個過程需要1到2分鐘的時間,但是這個過程只會對新的任務調度有影響。為了使用這種方式需要的配置項為:SPARK_DAEMON_JAVA_OPTS,這個配置項有三個配置信息:spark.deploy.recoveryMode/spark.deploy.zookeeper.url/spark.deploy.zookeeper.dir

2.2、使用本地文件系統來恢復該節點。為了使用這種方式需要的配置項為:SPARK_DAEMON_JAVA_OPTS,這個配置項有兩個配置信息:spark.deploy.recoveryMode、spark.deploy.recoveryDirectory

Spark架構與原理


Spark架構采用了分布式計算中的Master-Slave模型。Master是對應集群中的含有Master進程的節點,Slave是集群中含有Worker進程的節點。Master作為整個集群的控制器,負責整個集群的正常運行;Worker相當于是計算節點,接收主節點命令與進行狀態匯報;Executor負責任務的執行;Cluster作為用戶的客戶端負責提交應用,Driver負責控制一個應用的執行。
Spark集群部署后,需要在主節點和從節點分別啟動Master進程和Woker進程,對整個集群進行控制。在一個Spark應用的執行過程中,Driver和Worker是兩個重要角色。Driver程序是應用邏輯執行的起點,負責作業的調度,即Task任務的分發,而多個Worker用來管理計算節點和創建Executor并行處理任務。在執行階段,Driver會將Task和Task所依賴的
file和jar序列化后傳遞給對應的Worker機器,同時Exucutor對相應數據分區的任務進行處理。
下面詳細介紹Spark的架構中的基本組件。

ClusterManager:在Standalone模式中即為Master(主節點),控制整個集群,監控Worker。在YARN模式中為資源管理器。

Worker:從節點,負責控制計算節點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節點的控制。
Spark整體流程為:Client提交應用,Master找到一個Worker啟動Driver,Driver向Master或者資源管理器申請資源,之后將應用轉化為RDD Graph,再由DAGScheduler將RDD Graph轉化為Stage的有向無環圖提交給TaskScheduler,由TaskScheduler提交任務給Executor執行。在任務執行過程中,其他組件協同工作,確保整個應用順利進行。

計算模型

Application:應用。可以認為是多次批量計算組合起來的過程,在物理上可以表現為你寫的程序包+部署配置。應用的概念類似于計算機中的程序,它只是一個藍本,尚沒有運行起來。

RDD:Resilient Distributed Datasets,彈性分布式數據集。RDD即是計算模型里的一個概念,也是你編程時用到的一種類。一個RDD可以認為是spark在執行分布式計算時的 一批相同來源、相同結構、相同用途的數據集,這個數據集可能被切割成多個分區,分布在不同的機器上,無論如何,這個數據集被稱為一個RDD。在編程 時,RDD對象就對應了這個數據集,并且RDD對象被當作一個數據操作的基本單位。比如,對某個RDD對象進行map操作,其實就相當于將數據集中的每個 分區的每一條數據進行了map映射。

Partition:分區。一個RDD在物理上被切割成多個數據子集,分布在不同的機器上。每個數據子集叫一個分區。

RDD Graph:RDD組成的DAG(有向無環圖)。RDD是不可變的,一個RDD經過某種操作后,會生成一個新的RDD。這樣說來,一個 Application中的程序,其內容基本上都是對各種RDD的操作,從源RDD,經過各種計算,產生中間RDD,最后生成你想要的RDD并輸出。這個 過程中的各個RDD,會構成一個有向無環圖。

Lineage:血統。RDD這個概念本身包含了這種信息“由哪個父類RDD經過哪種操作得到”。所以某個RDD可以通過不斷尋找父類,找到最原始的那個RDD。這條繼承路徑就認為是RDD的血統。

Job:從Application和RDD Graph的概念可以知道,一個應用往往對應了一個RDD Graph。這個應用在準備被spark集群運行前,實際上就是會生成一個或多個RDD Graph結構,而一個RDD Graph,又可以生成一個或多個Job。一個Job可以認為就是會最終輸出一個結果RDD(后面會介紹,實際上這是action操作)的一條由RDD組 織而成的計算,在Application生成的RDD Graph上表現為一個子圖。Job在spark里應用里也是一個被調度的單位。

寬依賴:RDD生成另一個RDD時,各個兩個父子RDD間分區的對應關系,被叫做RDD間依賴。寬依賴就是子RDD的某個分區,依賴父RDD的全部分區。

窄依賴:窄依賴就是子RDD的某個分區,只依賴常數個父RDD的分區。寬窄依賴的區別如下圖所示。

Stage:Stage可以理解為完成一個Job的不同階段。一個Job被劃分為多個Stage,每個Stage又包含了對多個RDD的多個操作。一個Stage里,一般包含了一個寬依賴操作,或者多個窄依賴操作。
窄依賴是指前一個rdd計算能出一個唯一的rdd,比如map或者filter等;寬依賴則是指多個rdd生成一個或者多個rdd的操作,比如groupbykey reducebykey等,這種寬依賴通常會進行shuffle。

算子:父子RDD間的某種操作,被叫某種算子。比如下面會介紹的map,filter,groupByKey等。算子可從多個維度分類,之后再介紹。

Task:一個分區對應一個Task。實際上一個Task就是在一個Stage范圍內,某個Executor所要執行的算子。

TaskSet:一個Stage范圍內,所有相同的Task被稱為一個TaskSet。

DAGScheduler:DAGScheduler用于根據RDD DAG切分Stage,并維護各個Stage的先后依賴關系,相當于完成了一個Job內的不同Stage間的調度策略。

TasksetManager:管理一個TaskSet,并決定了這個TaskSet中各個Task的分發策略。

TaskScheduler:執行實際的Task分發操作。

SparkUI、History Server:

SparkUI: 4044
History Server:18080
怎么看?http://www.cnblogs.com/xing90...

參考

http://blog.csdn.net/qq_26562...
http://blog.csdn.net/suzyu123...
http://www.cnblogs.com/helloc...
http://blog.csdn.net/suzyu123...
http://www.jianshu.com/nb/340...
http://www.cnblogs.com/ainima...
http://www.chinahadoop.cn/gro...
https://yq.aliyun.com/article...
http://ifeve.com/category/spa...

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67244.html

相關文章

  • Java9模塊化學習筆記一之快速入門

    摘要:如果你想查看運行時模塊的加載過程輸出結果表示為模塊,由于我限制了不再往下輸出了,而我們模塊又沒有別的額外依賴,所以僅有這行輸出。 jdk9模塊快速入門 列出自帶模塊:java --list-modulesmac多版本jdk共存:http://adolphor.com/blog/2016...模塊規則示意圖:showImg(https://segmentfault.com/img/bVb...

    cjie 評論0 收藏0
  • 零基礎如何學爬蟲技術

    摘要:楚江數據是專業的互聯網數據技術服務,現整理出零基礎如何學爬蟲技術以供學習,。本文來源知乎作者路人甲鏈接楚江數據提供網站數據采集和爬蟲軟件定制開發服務,服務范圍涵蓋社交網絡電子商務分類信息學術研究等。 楚江數據是專業的互聯網數據技術服務,現整理出零基礎如何學爬蟲技術以供學習,http://www.chujiangdata.com。 第一:Python爬蟲學習系列教程(來源于某博主:htt...

    KunMinX 評論0 收藏0
  • Python爬蟲學習路線

    摘要:以下這些項目,你拿來學習學習練練手。當你每個步驟都能做到很優秀的時候,你應該考慮如何組合這四個步驟,使你的爬蟲達到效率最高,也就是所謂的爬蟲策略問題,爬蟲策略學習不是一朝一夕的事情,建議多看看一些比較優秀的爬蟲的設計方案,比如說。 (一)如何學習Python 學習Python大致可以分為以下幾個階段: 1.剛上手的時候肯定是先過一遍Python最基本的知識,比如說:變量、數據結構、語法...

    liaoyg8023 評論0 收藏0
  • Spark綜合學習筆記(三)搜狗搜索日志分析

    摘要:學習致謝一數據數據網站二需求針對用戶查詢日志數據中不同字段,使用讀取日志數據,封裝到數據集中,調用函數和函數進行處理不同業務統計分析三分詞工具測試使用比較流行好用的中文分區面向生產環境的自然語言處理工具包,是由一系列模 ...

    AZmake 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<