国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

分布式計算框架MapReduce

Tecode / 2090人閱讀

1 MapReduce概念 和 MapReduce編程模型
什么是MapReduce

  • 源于Google的MapReduce論文(2004年12月)
  • Hadoop的MapReduce是Google論文的開源實現
  • MapReduce優點: 海量數據離線處理&易開發
  • MapReduce缺點: 實時流式計算

MapReduce分而治之的思想

  • 數錢實例:一堆鈔票,各種面值分別是多少
  • 單點策略
  • 一個人數所有的鈔票,數出各種面值有多少張
  • 分治策略
  • 每個人分得一堆鈔票,數出各種面值有多少張
  • 匯總,每個人負責統計一種面值
  • 解決數據可以切割進行計算的應用

MapReduce編程分Map和Reduce階段

  • 將作業拆分成Map階段和Reduce階段
  • Map階段 Map Tasks 分:把復雜的問題分解為若干"簡單的任務"
  • Reduce階段: Reduce Tasks 合:reduce

MapReduce編程執行步驟

  • 準備MapReduce的輸入數據
  • 準備Mapper數據
  • Shuffle
  • Reduce處理
  • 結果輸出

編程模型

  • 借鑒函數式編程方式
  • 用戶只需要實現兩個函數接口:

Map(in_keyin_value)

--->(out_keyintermediate_value) list

Reduce(out_keyintermediate_value) list

--->out_value list

  • Word Count 詞頻統計案例

2 應用MRJob編寫MapReduce代碼
mrjob 簡介

  • 使用python開發在Hadoop上運行的程序 mrjob是最簡單的方式
  • mrjob程序可以在本地測試運行也可以部署到Hadoop集群上運行
  • 如果不想成為hadoop專家 但是需要利用Hadoop寫MapReduce代碼mrJob是很好的選擇

mrjob 安裝

  • 使用pip安裝

pip install mrjob
mrjob實現WordCount

from mrjob.job import MRJob

class MRWordCount(MRJob):

    #每一行從line中輸入
    def mapper(self _ line):
        for word in line.split():
            yield word1

    # word相同的 會走到同一個reduce
    def reducer(self word counts):
        yield word sum(counts)

if __name__ == __main__:
    MRWordCount.run()

運行WordCount代碼

打開命令行 找到一篇文本文檔 敲如下命令:

python mr_word_count.py my_file.txt

運行MRJOB的不同方式

1、內嵌(-r inline)方式

特點是調試方便,啟動單一進程模擬任務執行狀態和結果,默認(-r inline)可以省略,輸出文件使用 > output-file 或-o output-file,比如下面兩種運行方式是等價的

python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt

2、Hadoop(-r hadoop)方式

用于hadoop環境

python word_count.py -r hadoop hdfs:///test.txt -o  hdfs:///output

支持Hadoop運行調度控制參數,如:

1)指定Hadoop任務調度優先級(VERY_HIGH|HIGH)如:--jobconf mapreduce.job.priority=VERY_HIGH。

2)Map及Reduce任務個數限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5

3 mrjob 實現 topN統計(實驗)
統計數據中出現次數最多的前n個數據

import sys
from mrjob.job import MRJobMRStep
import heapq

class TopNWords(MRJob):
    def mapper(self _ line):
        if line.strip() != "":
            for word in line.strip().split():
                yield word1

    #介于mapper和reducer之間,用于臨時的將mapper輸出的數據進行統計
    def combiner(self word counts):
        yield wordsum(counts)

    def reducer_sum(self word counts):
        yield None(sum(counts)word)

    #利用heapq將數據進行排序,將最大的2個取出
    def top_n_reducer(self_word_cnts):
        for cntword in heapq.nlargest(2word_cnts):
            yield wordcnt

    #實現steps方法用于指定自定義的mapper,comnbiner和reducer方法
    def steps(self):
        #傳入兩個step 定義了執行的順序
        return [
            MRStep(mapper=self.mapper
                   combiner=self.combiner
                   reducer=self.reducer_sum)
            MRStep(reducer=self.top_n_reducer)
        ]

def main():
    TopNWords.run()

if __name__==__main__:
    main()

4 MapReduce原理詳解
單機程序計算流程

輸入數據--->讀取數據--->處理數據--->寫入數據--->輸出數據

Hadoop計算流程

input data:輸入數據

InputFormat:對數據進行切分,格式化處理

map:將前面切分的數據做map處理(將數據進行分類,輸出(kv)鍵值對數據)

shuffle&sort:將相同的數據放在一起,并對數據進行排序處理

reduce:將map輸出的數據進行hash計算,對每個map數據進行統計計算

OutputFormat:格式化輸出數據
image.png

image.png

map:將數據進行處理

buffer in memory:達到80%數據時,將數據鎖在內存上,將這部分輸出到磁盤上

partitions:在磁盤上有很多"小的數據",將這些數據進行歸并排序。

merge on disk:將所有的"小的數據"進行合并。

reduce:不同的reduce任務,會從map中對應的任務中copy數據,在reduce中同樣要進行merge操作

5 MapReduce架構
MapReduce架構 1.X
JobTracker:負責接收客戶作業提交,負責任務到作業節點上運行,檢查作業的狀態
TaskTracker:由JobTracker指派任務,定期向JobTracker匯報狀態,在每一個工作節點上永遠只會有一個TaskTracker
image.png

MapReduce2.X架構

ResourceManager:負責資源的管理,負責提交任務到NodeManager所在的節點運行,檢查節點的狀態
NodeManager:由ResourceManager指派任務,定期向ResourceManager匯報狀態
{{image.png(uploading...)}}

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/126017.html

相關文章

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<