1 MapReduce概念 和 MapReduce編程模型
什么是MapReduce
MapReduce分而治之的思想
MapReduce編程分Map和Reduce階段
MapReduce編程執行步驟
編程模型
Map(in_keyin_value)
--->(out_keyintermediate_value) list
Reduce(out_keyintermediate_value) list
--->out_value list
2 應用MRJob編寫MapReduce代碼
mrjob 簡介
mrjob 安裝
pip install mrjob
mrjob實現WordCount
from mrjob.job import MRJob
class MRWordCount(MRJob):
#每一行從line中輸入
def mapper(self _ line):
for word in line.split():
yield word1
# word相同的 會走到同一個reduce
def reducer(self word counts):
yield word sum(counts)
if __name__ == __main__:
MRWordCount.run()
運行WordCount代碼
打開命令行 找到一篇文本文檔 敲如下命令:
python mr_word_count.py my_file.txt
運行MRJOB的不同方式
1、內嵌(-r inline)方式
特點是調試方便,啟動單一進程模擬任務執行狀態和結果,默認(-r inline)可以省略,輸出文件使用 > output-file 或-o output-file,比如下面兩種運行方式是等價的
python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt
2、Hadoop(-r hadoop)方式
用于hadoop環境
python word_count.py -r hadoop hdfs:///test.txt -o hdfs:///output
支持Hadoop運行調度控制參數,如:
1)指定Hadoop任務調度優先級(VERY_HIGH|HIGH)如:--jobconf mapreduce.job.priority=VERY_HIGH。
2)Map及Reduce任務個數限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5
3 mrjob 實現 topN統計(實驗)
統計數據中出現次數最多的前n個數據
import sys
from mrjob.job import MRJobMRStep
import heapq
class TopNWords(MRJob):
def mapper(self _ line):
if line.strip() != "":
for word in line.strip().split():
yield word1
#介于mapper和reducer之間,用于臨時的將mapper輸出的數據進行統計
def combiner(self word counts):
yield wordsum(counts)
def reducer_sum(self word counts):
yield None(sum(counts)word)
#利用heapq將數據進行排序,將最大的2個取出
def top_n_reducer(self_word_cnts):
for cntword in heapq.nlargest(2word_cnts):
yield wordcnt
#實現steps方法用于指定自定義的mapper,comnbiner和reducer方法
def steps(self):
#傳入兩個step 定義了執行的順序
return [
MRStep(mapper=self.mapper
combiner=self.combiner
reducer=self.reducer_sum)
MRStep(reducer=self.top_n_reducer)
]
def main():
TopNWords.run()
if __name__==__main__:
main()
4 MapReduce原理詳解
單機程序計算流程
輸入數據--->讀取數據--->處理數據--->寫入數據--->輸出數據
Hadoop計算流程
input data:輸入數據
InputFormat:對數據進行切分,格式化處理
map:將前面切分的數據做map處理(將數據進行分類,輸出(kv)鍵值對數據)
shuffle&sort:將相同的數據放在一起,并對數據進行排序處理
reduce:將map輸出的數據進行hash計算,對每個map數據進行統計計算
OutputFormat:格式化輸出數據
map:將數據進行處理
buffer in memory:達到80%數據時,將數據鎖在內存上,將這部分輸出到磁盤上
partitions:在磁盤上有很多"小的數據",將這些數據進行歸并排序。
merge on disk:將所有的"小的數據"進行合并。
reduce:不同的reduce任務,會從map中對應的任務中copy數據,在reduce中同樣要進行merge操作
5 MapReduce架構
MapReduce架構 1.X
JobTracker:負責接收客戶作業提交,負責任務到作業節點上運行,檢查作業的狀態
TaskTracker:由JobTracker指派任務,定期向JobTracker匯報狀態,在每一個工作節點上永遠只會有一個TaskTracker
MapReduce2.X架構
ResourceManager:負責資源的管理,負責提交任務到NodeManager所在的節點運行,檢查節點的狀態
NodeManager:由ResourceManager指派任務,定期向ResourceManager匯報狀態
{{image.png(uploading...)}}
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/126017.html
閱讀 3532·2023-04-25 20:09
閱讀 3736·2022-06-28 19:00
閱讀 3056·2022-06-28 19:00
閱讀 3075·2022-06-28 19:00
閱讀 3168·2022-06-28 19:00
閱讀 2874·2022-06-28 19:00
閱讀 3038·2022-06-28 19:00
閱讀 2632·2022-06-28 19:00