1.1 大數(shù)據(jù)平臺優(yōu)勢
橫向擴展:大數(shù)據(jù)技術(shù)出現(xiàn)之初所要解決的問題就是數(shù)據(jù)存儲與計算,近年來隨著數(shù)據(jù)量產(chǎn)生速度越來越快,傳統(tǒng)平臺存儲與計算能力遇到瓶頸,而大數(shù)據(jù)平臺是分布式架構(gòu),理論上是可以無限擴展的,所以其能更好的適應(yīng)時代的發(fā)展。
資源共享:企業(yè)通過使用單一集群,可以化零為整,整合所有可用服務(wù)器資源,并統(tǒng)一對外提供所有的能力,可以實現(xiàn)細(xì)粒度的資源調(diào)度機制。并且只需維護一個集群,降低運維成本。
數(shù)據(jù)共享:使用單一存儲架構(gòu),可以將企業(yè)內(nèi)部所有數(shù)據(jù)集中在一個集群中,方便進行各種業(yè)務(wù)數(shù)據(jù)的整合使用,從而充分利用大數(shù)據(jù)技術(shù)全量數(shù)據(jù)分析的優(yōu)勢。
服務(wù)共享:通過統(tǒng)一服務(wù)架構(gòu),可將一套統(tǒng)一服務(wù)設(shè)計規(guī)則應(yīng)用到所有的服務(wù)實現(xiàn)上,例如一張表數(shù)據(jù)可以以文件形式共享也能以接口形式接口進行共享,我們進行統(tǒng)一之后各個部門可以以相同方法進行調(diào)用使用,避免煙囪式架構(gòu),間接減少重復(fù)開發(fā)成本。
安全保障:通過統(tǒng)一安全架構(gòu),在單一集群架構(gòu)基礎(chǔ)上實現(xiàn)細(xì)粒度的資源隔離,對不同人員進行不同程度的授權(quán)。
1.2 大數(shù)據(jù)平臺需要具備的能力
集群監(jiān)控與管理:毫無疑問集群是需要能夠進行統(tǒng)籌的管理及監(jiān)控的能力,否則運維團隊在做運維時將無從下手。
資源接入:數(shù)據(jù)是一個企業(yè)的核心資源,我們對業(yè)務(wù)模型的建立,分析,挖掘都需要建立在原始數(shù)據(jù)之上,而這些數(shù)據(jù)來源多(日志,關(guān)系數(shù)據(jù)庫,爬蟲等),類型雜(結(jié)構(gòu)化,半結(jié)構(gòu)化,非結(jié)構(gòu)化數(shù)據(jù)),體量大,所以大數(shù)據(jù)平臺需要能夠?qū)痈鞣N來源及各種類型的海量數(shù)據(jù)。
數(shù)據(jù)存儲及查詢:數(shù)據(jù)接入之后,就需要根據(jù)不同的應(yīng)用場景進行存儲,例如關(guān)系型數(shù)據(jù)模型,非關(guān)系型數(shù)據(jù)模型,文檔數(shù)據(jù)模型,大數(shù)據(jù)平臺需要能夠提供不同的存儲模型及不同的查詢手段。
數(shù)據(jù)計算:根據(jù)不同的應(yīng)用場景會有不同的計算要求,簡單的可以分為離線計算和實時計算,機器學(xué)習(xí),多維分析。在數(shù)據(jù)對時效性要求不高且數(shù)據(jù)量大的情況下可以選擇離線計算。例如報表之類的需求。但對于時效性要求比較高的場景,例如銀行的風(fēng)險控制,就需要選擇實時計算模型。機器學(xué)習(xí)可以使用大數(shù)據(jù)平臺的全量數(shù)據(jù)進行模型訓(xùn)練,常用于預(yù)測,預(yù)警,推薦等應(yīng)用場景。例如今日頭條。由于大數(shù)據(jù)平臺數(shù)據(jù)的互通性使得可以從多個維度對某一事件進行分析,例如從商品,客戶,價格,商品折扣,商品促銷等多個維度進行分析,從而得出某一商品近期銷售額是增加了還是減少了,商品的主要消費群體是什么年齡段的。
大數(shù)據(jù)平臺安全管理:需要具備用戶管理與訪問控制能力。
任務(wù)管理及調(diào)度:我們開發(fā)的數(shù)據(jù)抽取,離線計算還是實時計算等都需要以任務(wù)形式提交到調(diào)度系統(tǒng),可以進行任務(wù)追蹤,日志查詢key,執(zhí)行周期性要求等。
2.1 hadoop生態(tài)圈
2.2 大數(shù)據(jù)平臺-HDFS
HDFS是一款分布式文件系統(tǒng),能夠存儲在廉價的機器上,能夠存儲海量的文件數(shù)據(jù),同時擁有完善的錯誤恢復(fù)機制,其是GFS(谷歌分布式文件系統(tǒng))的開源實現(xiàn),可以說HDFS是整個平臺架構(gòu)里的基石。
HDFS是Master/Slave架構(gòu),一個HDFS集群式由一個NameNode和一定數(shù)目的DataNode組成,NameNode是一個中心服務(wù)器,負(fù)責(zé)管理文件系統(tǒng)的命名空間以及客戶端對文件的訪問,例如打開,關(guān)閉,關(guān)閉,重命名文件或目錄,負(fù)責(zé)確定數(shù)據(jù)塊存儲到具體哪個DataNode節(jié)點,DataNode負(fù)責(zé)處理文件系統(tǒng)客戶端的讀寫請求,在NameNode的統(tǒng)一調(diào)度下進行數(shù)據(jù)塊的創(chuàng)建,刪除,復(fù)制等。架構(gòu)如下:
2.3大數(shù)據(jù)平臺-Zookeeper
Zookeeper是一款分布式協(xié)同管理框架,如果您在學(xué)習(xí)hadoop集群時,肯定會問既然多臺服務(wù)器協(xié)同工作,那么如何實現(xiàn)配置同步及Master與Slave之間的通信呢,也就是Slave如何向Master“匯報工作”和Master如何向Slave“分配工作”的呢,如果Master不可用時,通常需要從Slave中在選舉出一個節(jié)點作為Master,那么這些問題都是如何解決的呢?答案就是Zookeeper,Zookeeper自身擁有高度可靠性,可擴展性和容錯性,能夠提供統(tǒng)一命名服務(wù),分布式鎖,分布式隊列,選舉,配置同步,心跳檢查等功能,一言以概之就是Zookeeper幫助我們管理集群內(nèi)部瑣碎的事情。邏輯架架構(gòu)圖如下(也是主從模式):
2.4大數(shù)據(jù)平臺-Hbase
Hbase是構(gòu)建在HDFS之上的,分布式的非關(guān)系型數(shù)據(jù)庫,基于谷歌BigTable論文的開源實現(xiàn)。一張表能支撐數(shù)十億行和數(shù)百萬列,還能體現(xiàn)其快速查詢的能力。從設(shè)計上來說其是由三類服務(wù)構(gòu)成的Master/Slave架構(gòu)。Master進程負(fù)責(zé)Region(按照RowKey將表分割成若干個塊)的分配,DDL這類操作,Region-Server進程負(fù)責(zé)數(shù)據(jù)的讀寫,底層數(shù)據(jù)存儲和集群協(xié)同管理由HDFS、Zookeeper管理,表數(shù)據(jù)底層也是一個個的存儲在HDFS上的Hfile文件。
數(shù)據(jù)模型:RowKey相當(dāng)于關(guān)系型數(shù)據(jù)庫中的主鍵,唯一的。ColumnFamily相當(dāng)于子表的概念,每個Column都必須屬于某個ColumnFamily;Column真正定義數(shù)據(jù)的屬性字段。Version概念,每次修改數(shù)據(jù)就會產(chǎn)生一個新的版本,Hbase默認(rèn)存儲數(shù)據(jù)的三個版本(是可以更改的),查詢默認(rèn)是最新版本。數(shù)據(jù)模型如下:
WAL:預(yù)寫日志是HDFS上的一個文件,是一個容災(zāi)策略,Hbase為了提高寫性能,在寫入數(shù)據(jù)時并不直接寫入磁盤中,而是將數(shù)據(jù)直接保存在內(nèi)存中,但內(nèi)存大小畢竟是有限的,所以當(dāng)數(shù)據(jù)存儲達到某個閾值時就將數(shù)據(jù)寫入磁盤并清空內(nèi)存,但是數(shù)據(jù)存放在內(nèi)存(MemStore)中并不安全,所以Hbase采用了預(yù)寫日志方式,當(dāng)數(shù)據(jù)丟失時可以根據(jù)日志恢復(fù)數(shù)據(jù),數(shù)據(jù)寫入日志就算寫入成功,并且寫入日志是對磁盤的順序?qū)懭?,所以寫入速度是非??斓?,正是這種模式既保證了寫入速度,也保證了可靠性。
BlockCache:是一種讀緩存,客戶端讀取數(shù)據(jù)會先從該緩存塊中查找數(shù)據(jù)。Hbase會將一次文件查找的數(shù)據(jù)塊緩存到內(nèi)存中,以便后續(xù)同一個查找請求。
MemStore:是一種寫緩存,如上所述,數(shù)據(jù)寫入并不是直接寫入磁盤,而是先寫入到內(nèi)存中。
Hfile:是最終數(shù)據(jù)的存儲載體,本質(zhì)上就是HDFS文件。
2.5 大數(shù)據(jù)平臺-YARN
YARN是一款集群資源調(diào)度框架,其是從Mapreduce中獨立出來的,Hadoop1.X時mapreduce不僅充當(dāng)計算框架角色也擔(dān)當(dāng)資源管理角色,從Hadoop2.X后就將資源調(diào)度功能獨立出來,也就是我們要說的YARN。試想一下我們集群中部署了Hbase,Hive,Spark等多個大數(shù)據(jù)組件,每個組件設(shè)計之初都有自己的一套資源調(diào)度系統(tǒng)來管理資源的分配,他們都認(rèn)為自己應(yīng)該使用100%的服務(wù)器資源,但是資源總量就那么多;或者資源分配不合理等問題。所以我們就需要一個統(tǒng)一的資源調(diào)度框架來進行統(tǒng)一管理。而YARN就是這樣一種框架。
資源模型:YARN利用Container對象作為資源的基本單位,包括資源名稱,內(nèi)存和CPU。Container將資源進行了隔離,每個應(yīng)用都可以通過ApplicationMaster向ResourceManager申請資源,例如某個Spark計算任務(wù)申請到了6個Container資源。
ResourceManager是全局資源管理器,負(fù)責(zé)整個集群的資源分配。
ApplicationMaster負(fù)責(zé)跟ResourceManager進行通信,以生申請所需要的資源,例如Spark的Driver進程。
NodeManager是每個節(jié)點上的資源管理器,負(fù)責(zé)自己所在服務(wù)器的資源利用的整個生命周期。
YARN工作過程:
1.用戶向YARN提交應(yīng)用程序。
2.ResourceManager為該應(yīng)用找到一個可用的NodeManager并分配第一個Container,然后再Container中啟動這個ApplicationMaster。
3.ApplicationMaster向ResourceManager進行注冊,并且采用輪詢方式向ResourceManager申請資源,申請到資源后與對應(yīng)的NodeManager進行通信要求他設(shè)置運行環(huán)境。
4.任務(wù)開始運行并向ApplicationMaster匯報自己的狀態(tài)和進度。
5.任務(wù)執(zhí)行完畢后,ApplicationMaster向ResourceManager注銷并關(guān)閉自己。
2.6 大數(shù)據(jù)平臺-Spark
Spark是一款分布式內(nèi)存計算模型,其計算是基于內(nèi)存的,所以計算速度較其他計算引擎是很快的,Spark基于一套統(tǒng)一的數(shù)據(jù)模型(RDD)和編程模型(Trans/Action)之上,構(gòu)建出SparkSQL,SparkStreaming,Mlib,GraphX等分支。如圖:
如上圖可以看出SparkCore是Spark的核心,Spark部署可以支持StandAlone模式,Yarn模式,Mesos模式,StandAlone模式就是利用Spark自己的資源管理器,Yarn之前說了是一種通用的資源管理框架,官方推薦的是Mesos模式,但是工作中一般是YARN模式,個人理解可能是YARN模式比較通用吧。
Spark生態(tài)圈如下:
從上圖可以看出Spark支持Scala,Java,R、Python語言,每種語言都有相應(yīng)的庫進行支持,但是推薦使用Scala語言,兼容性比較好,因為Spark就是用Scala語言編寫的。SparkSQL主要用于批處理作業(yè),實際工作中90%以上工作都是SparkSQL完成的,SparkStreaming子模塊主要用于對時效性要求比較高的場景,但其是一種近實時,本質(zhì)上也是一種批處理,只不過是根據(jù)時間分成足夠小的批。Spark是一種計算引擎,可以看出其數(shù)據(jù)源可以來自Flume,Kafka,HDFS,文件等,經(jīng)過計算后可以存儲到Hbase,關(guān)系型數(shù)據(jù)庫,HDFS等中,一般流處理都會涉及到Kafka,Spark。
Spark架構(gòu):
SparkContext是Spark編程的入口,一個JVM中只能有一個SparkContext,如上圖的DriverApplication就相當(dāng)于程序中的Main函數(shù),其向資源管理器申請資源,資源申請后會反饋給Driver進程,之后Driver就可以直接與Executor交流了,Driver分配任務(wù)給Executor,實際執(zhí)行任務(wù)的是Executor,Task是最小的執(zhí)行單元,其數(shù)量由分區(qū)數(shù)量決定。這里只是簡單的說明了一下,如果有興趣,可以理解一下DAG,Stage的劃分,Stage又是一組可以并行執(zhí)行的Task集合。
Spark是一個很重要的組件,其東西也很多,不是一下就能概括完的,像寬窄依賴,存儲級別,算子分類,Stage劃分,廣播變量,累加器等,如果有興趣還需要更深入的學(xué)習(xí)。
除了這些介紹的大數(shù)據(jù)組件,還有很多,向Kafka,F(xiàn)lume,Hive,Sqoop等,也都是比較常用的。
Ambari是一款用于部署、管理和監(jiān)控Hadoop集群服務(wù)的開源系統(tǒng),實現(xiàn)了以下功能:1.安裝hadoop集群,實現(xiàn)了界面化的安裝過程。2.管理hadoop集群,提供了啟動,停止等功能。3.監(jiān)控hadoop集群,監(jiān)控hadoop集群的健康狀態(tài),提供了一套健康指標(biāo)體系收集監(jiān)控數(shù)據(jù),和一套預(yù)警礦漿,可以實現(xiàn)預(yù)定指標(biāo)的預(yù)警功能。Amari也是Master/Slave架構(gòu),由一個Ambari-Server和多個Ambari-Agent組成,但是Ambari-Server安裝成功之后也可以通過界面方式來安裝Ambari-Agent,支持以下操作系統(tǒng):Redhat6/7,CentOS6/7,Ubuntu12/14等。接下來我們以在Redhat7上利用Ambari安裝HDP為例(有興趣的也可以了解下在阿里云上利用ClouderaManager安裝CDH)。
一、準(zhǔn)備工作:
1.需要安裝JDK1.7或者1.8,;2.安裝Python2.6以上版本,因為會通過python調(diào)用一些命令腳本。
二、安裝包下載:
wgethttp://public-repo-1.hortonworks.com/ambari/centos6/2.x/updates/2.4.0.1/AMBARI-2.4.0.1-centos6.tar.gz
wgethttp://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.5.0.0/HDP-2.5.0.0-centos6-rpm.tar.gz
wgethttp://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6/HDP-UTILS-1.1.0.21-centos6.tar.gz
三、安裝Apache服務(wù)器并啟動:
yum install httpd
/etc/init.d/httpd start
四、解壓:
tar -zxvf HDP-2.5.0.0-centos6-rpm.tar.gz -C /var/www/html/hdp
tar -zxvf HDP-UTILS-1.1.0.21-centos6.tar.gz -C/var/www/html/hdp/HDP-UTILS-1.1.0.21
tar -zxvf AMBARI-2.4.0.1-centos6.tar.gz -C /var/www/html/ambari
五、搭建本地yum源倉庫:
創(chuàng)建ambari.repo文件
[Ambari-2.4.0.1]
name=Ambari-2.4.0.1
baseurl=http://server/AMBARI-2.4.0.1/centos6/2.4.0.1-1
gpgcheck=1
gpgkey=http://server/AMBARI-2.4.0.1/centos6/2.4.0.1-1/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1
創(chuàng)建hdp.repo文件
HDP-2.5.0.0]
name=HDP-2.5.0.0
baseurl=http://server/home/www/HDP/centos6
path=/
enabled=1
gpgcheck=0
[HDP-UTILS-1.1.0.21]
name=Hortonworks Data Platform Version - HDP-UTILS-1.1.0.21
baseurl=http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6
gpgcheck=1
enabled=1
priority=1
最后將這兩個文件復(fù)制到所有準(zhǔn)備安裝hadoop組件的服務(wù)器上,yumrepolist查看,如果出現(xiàn)Ambari-2.4.0.1和HDP-2.5.0.0這說明配置成功。
六、關(guān)閉防火墻和SELinux
七、配置主機表
在每臺服務(wù)器上vim/etc/hosts:添加如下信息:
IP1 hostname1
IP2 hostname2
..............
八、安裝postgresql-server,因為Ambari需要使用他存儲元數(shù)據(jù)。
yum install postgresql-server
九、執(zhí)行ambari-serversetup進行相關(guān)配置
首先會先檢查是否已經(jīng)禁止了SELinux,如果之前配置好就沒什么問題,
還有一個JDK的配置,我安裝時候需要使用標(biāo)準(zhǔn)oraclejdk1.7,沒有選擇自己安裝的JDK。這個可能不同系統(tǒng)不一樣,需要注意一下
十、安裝Ambari-server
yum install ambari-server
十一、啟動
默認(rèn)端口為8080,可以通過vim/etc/ambari-server/conf/ambari.properties
client.api.port=XXX進行修改。
執(zhí)行ambari-serverstart,打開瀏覽器輸入http://ip:port/可以進入到Ambari界面,接下來hadoop組件都可以通過界面方式安裝,就很簡單了。
4.1 數(shù)據(jù)抽取
數(shù)據(jù)開發(fā)過程首先就是數(shù)據(jù)的抽取,確定使用何種組件或工具來進行抽取,常用抽取方式有Sqoop,flume,kettle等,如果不考慮成本也可以自己寫程序?qū)崿F(xiàn)。
sqoop是一款開源的工具,主要用于關(guān)系型數(shù)據(jù)庫和HDFS之間數(shù)據(jù)的傳輸,雖然簡單但是挺常用的。
flume架構(gòu)如下:
agent相當(dāng)于數(shù)據(jù)包,包括數(shù)據(jù)來源source,數(shù)據(jù)緩沖channel,數(shù)據(jù)輸出目標(biāo)三個部分,agent可以任意組合,如圖:
flume支持的數(shù)據(jù)源有kafka,netcat,file,httpSource等,支持的sink有HDFS,Hive,Kafka等,具體可以參考中文文檔:https://flume.liyifeng.org/,如圖
腳本配置案例(將文件中的數(shù)據(jù)抽取到kafka中):
# 初始化
test.sources = testSource
test.channels = testChannel
test.sinks = testSink
# 配置channel
test.channels.testChannel.type = file
test.channels.testChannel.checkpointDir = /var/flume/checkpoint/test
test.channels.testChannel.dataDirs = /var/flume/data/test
#配置source
test.sources.testSource.type = spooldir
test.sources.testSource.deserializer = LINE
test.sources.testSource.deserializer.maxLineLength = 6400
test.sources.testSource.spoolDir = /events/input/intra/test
test.sources.testSource.includePattern =test_[0-9]{4]-[0-9]{2]-[0-9]{2].csv
test.sources.testSource.channels = testChannel
# 定義sink
test.sinks.testSink.type = org.apache.flume.sink.kafka.KafkaSink
test.sinks.testSink.batchSize = 640
test.sinks.testSink.brokerList = sandbox-hdp.hortonworks.com:6667
test.sinks.testSink.topic = test
test.sinks.testSink.channel = testChannel
Kettle是一個開源的ETL工具,不僅包括數(shù)據(jù)抽取,還包括數(shù)據(jù)清洗,任務(wù)調(diào)度等功能,有興趣可以看“使用PDI構(gòu)建開源ETL解決方案”這本書。kettle增量抽取數(shù)據(jù)案例:
4.2 數(shù)據(jù)建模、清洗
數(shù)據(jù)抽取過來之后就需要根據(jù)應(yīng)用場景進行建模,數(shù)倉的建設(shè),常用維度建模,維度建模關(guān)鍵步驟:1.確定業(yè)務(wù)過程,比如資產(chǎn)到貨。2.聲明業(yè)務(wù)過程的粒度,例如到貨清單每一行。3.確定維度,例如到貨日期,供應(yīng)商,資產(chǎn),部門等。4.確定事實,比如資產(chǎn)價格,數(shù)量,運費,折扣等可度量的事實。建模過程一般需要業(yè)務(wù)部門的參與,首先確定業(yè)務(wù)活動價值鏈,比如:
之后創(chuàng)建業(yè)務(wù)總線矩陣,比如:
之后是創(chuàng)建高層模型,例如:
最后創(chuàng)建維度模型,例如
關(guān)于數(shù)倉建設(shè),可以看”數(shù)據(jù)倉庫工具箱維度建模權(quán)威指南“這本書,里面有大量案例來解釋維度建模方法,緩慢變化維,事務(wù)事實表,周期事實表,累加事實表等比較重要的概念。
數(shù)據(jù)模型創(chuàng)建完成就需要對數(shù)據(jù)進行清洗轉(zhuǎn)換,對于結(jié)構(gòu)化數(shù)據(jù)一般使用SQL腳本,KETTLE也是支持SQL腳本的,公司的數(shù)據(jù)平臺產(chǎn)品也是集成KETTLE的,使用公司的產(chǎn)品不僅可以利用kettle進行數(shù)據(jù)開發(fā),還可以進行元數(shù)據(jù)管理,數(shù)據(jù)共享等,這是單純使用KETTLE做不到的。
但是對于非結(jié)構(gòu)化數(shù)據(jù),使用SQL就很困難了,這時候還需要使用代碼實現(xiàn)了,例如日志數(shù)據(jù),爬蟲抓取的數(shù)據(jù),其結(jié)構(gòu)都是很亂的,使用SQL一般不能很好的進行處理。例如像下面這樣:
defprice_clean(extendedData:String,promotion_price:String,priceText:String,coupon:String,shop_coupon:String):String={
var price = ""
//shop_coupon字段
var shop_coupon_price:Double = 0.0
if (!extendedData.equals("-1")) {
val nObject: JSONObject = JSON.parseObject(extendedData)
val string1: String =nObject.getOrDefault("firePhoenixExtending", "-1").toString
if (string1 != "-1") {
val nObject2: JSONObject = JSON.parseObject(string1)
val money = nObject2.getOrDefault("money","-1").toString
if (money != (-1)) {
price = money
}
}
}
else {
price = pricedeal(check_price(promotion_price,priceText))
if (!shop_coupon.equals("-1")){
val array1: JSONArray = JSON.parseArray(shop_coupon)
for (i <- 0 to array1.size()-1){
val str: String = array1.get(i).toString
if (str.contains("滿")&&str.contains("元")){
val p1 = str.split("滿")(1).split("元")(0)
val p2 = str.split("省")(1).split("元")(0)
if(isIntByRegex(p1)&&isIntByRegex(p2)){
//判斷是否滿足條件
if (pricedeal(price).toDouble>=p1.toDouble){
shop_coupon_price = p2.toDouble
}
}
}
}
price = (price.toDouble - shop_coupon_price).toString
}
if (!coupon.equals("-1")){
val p1 = coupon.split("滿")(1).split("減")(0)
val p2 = coupon.split("滿")(1).split("減")(1)
if (pricedeal(price).toDouble>=p1.toDouble){
price = (pricedeal(price).toDouble -(pricedeal(price).toDouble/p1.toDouble).toInt*p2.toDouble).toString
}
}
}
price
//math.round(2.3)
}
4.3 案例分享
Lambda架構(gòu)是最常用的大數(shù)據(jù)架構(gòu),它是流批分離的,如下:
下面是一個案例分享,數(shù)據(jù)來源使用Kafkaproducer生產(chǎn)數(shù)據(jù),經(jīng)過SparkStreaming之后入到Mysql數(shù)據(jù)庫。用于模擬上述流處理部分:
項目結(jié)構(gòu)如下:
pom.xml依賴如下:
注意一下scala版本,需與本地scala版本保持一致,否則會報錯application.properties配置文件如下:
mysql.url=jdbc:mysql://127.0.0.1:3306/sakila
mysql.username=root
mysql.password=XXX
kafka.bootstrap.servers=10.5.65.83:9092
kafka.input.topic=actor
input.file=E:shareactor.csv
配置文件中定義了mysql信息及Kafka信息,input.file是生產(chǎn)數(shù)據(jù)的來源。
PropertiesUtil.java工具類:
package com.shsnc.utils;
public class PropertiesUtil {
public static java.util.Properties get_properties(Stringfilename) throws IOException {
InputStream in =PropertiesUtil.class.getResourceAsStream(filename);
java.util.Properties props = new java.util.Properties();
try{
InputStreamReader inputStreamReader = newInputStreamReader(in, "UTF-8");
props.load(inputStreamReader);
}catch (IOException e){
e.printStackTrace();
}
return props;
}
}
工具類用于加載配置文件
MyProduce.java類用于生產(chǎn)數(shù)據(jù)到kafka:
package com.shsnc.kafka;
import com.shsnc.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Properties;
public class MyProduce {
public void produce() throws Exception {
Properties pro =PropertiesUtil.get_properties("/application.properties");
//assign broker url
String brokerUrl =pro.getProperty("kafka.bootstrap.servers");
String topicName = pro.getProperty("kafka.input.topic");
String fileName = pro.getProperty("input.file");
//create an instance for properties to access theproducer configs
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//create producer
Producer
try {
//file
BufferedReader br = new BufferedReader( newFileReader(new File(fileName)) );
try {
long pos = 0, count = 0;
String text = br.readLine();
while ( text != null ) {
pos += text.length() + 2;
producer.send( new ProducerRecord
count++;
text = br.readLine();
}
System.out.println( Long.toString(count) + "messages sent." );
}
finally {
br.close();
}
}
catch ( java.lang.Exception e ) {
e.printStackTrace();
}
finally {
producer.close();
}
}
}
ToMysql.java用于將topic數(shù)據(jù)存入到mysql的actor表中。
package com.shsnc.utils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
public class ToMysql {
public static int write( ConsumerRecord
Properties pro =PropertiesUtil.get_properties("/application.properties");
//the jdbc url
String jdbcUrl = pro.getProperty("mysql.url");
//the user name
String user = pro.getProperty("mysql.username");
//the password
String password = pro.getProperty("mysql.password");
//the # of records puts
int numInserts = 0;
//check
if ( jdbcUrl == null || jdbcUrl.isEmpty() ) {
//error out
throw new Exception("The jdbc-url is notinitialized.");
}
Class.forName("com.mysql.jdbc.Driver");
//the connection object
Connection conn = DriverManager.getConnection(jdbcUrl, user,password);
try {
//flags
long passHead = 0;
//parse event record
String[] elements = records.value().split(",",-1 );
String sql = "insert intoactor1(actor_id,first_name,last_name,last_update)values("+elements[0]+","+elements[1]+","+elements[2]+","+elements[3]+")";
System.out.println(sql);
//String sql = "insert intoactor1(actor_id,first_name,last_name,last_update)values(1,b,c,2006-02-15 04:34:33)";
CallableStatement stmt = conn.prepareCall(sql);
try {
stmt.execute();
System.out.println("插入成功!");
numInserts++;
}
finally {
//close
stmt.close();
}
}
finally {
//close the connection
conn.close();
}
return numInserts;
}
}
KafkaSparkStream.java是通過spark實時讀取kafka數(shù)據(jù)并調(diào)用toMysql將數(shù)據(jù)存入到Mysql中。
package com.shsnc.stream;
import com.shsnc.utils.PropertiesUtil;
import com.shsnc.utils.ToMysql;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.io.IOException;
import java.util.*;
public class KafkaSparkStream {
public static void getStream(JavaStreamingContext jsc) throwsIOException, InterruptedException {
Properties pro =PropertiesUtil.get_properties("/application.properties");
Map
kafkaParams.put("bootstrap.servers",pro.getProperty("kafka.bootstrap.servers"));
kafkaParams.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "test");
kafkaParams.put("auto.offset.reset", "latest");
//kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", "false");
Collection
JavaInputDStream
KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.
);
System.out.println("stream");
JavaDStream jds = stream.map(consumerRecord ->ToMysql.write(consumerRecord));
jds.print();
jds.count();
}
}
Main.java是主函數(shù)。
package com.shsnc;
import com.shsnc.stream.KafkaSparkStream;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class Main {
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().
setAppName("SparkStreamingOnKafkaDirected").setMaster("local[*]");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10));
KafkaSparkStream.getStream(jsc);
jsc.start();
jsc.awaitTermination();
}
}
4.4 測試結(jié)果
Mysql目標(biāo)表為actor1,有actor_id,first_name,last_name,last_update個字段
測試如下:
起初目標(biāo)表為空:
kafka中topic中也無新數(shù)據(jù)產(chǎn)生:
現(xiàn)在啟動主函數(shù):
可見并沒有數(shù)據(jù)插入到mysql,現(xiàn)在生產(chǎn)10條數(shù)據(jù)到kafka:
可見kakfa中已經(jīng)新產(chǎn)生了10條數(shù)據(jù),那么這新增的10條數(shù)據(jù)是否被spark讀取并存入到了mysql呢?如圖:
這是控制臺日志,可見已經(jīng)數(shù)據(jù)已經(jīng)被spark讀取到并插入到mysql中,再驗證mysql的actor1表:
可見mysql中確實插入成功了,如果之后kafka中有新數(shù)據(jù)產(chǎn)生,最終都會被sparkstreaming處理并存儲到mysql中,不過例子中沒有偏移量的管理,如果程序終止(宕機)就可能導(dǎo)致數(shù)據(jù)的丟失。
使用kettle的批處理案例如下:
批處理中也包含了數(shù)據(jù)抽取,數(shù)據(jù)清洗轉(zhuǎn)換,數(shù)據(jù)裝載的過程。Kappa架構(gòu)去除了批處理,只保留了流處理,而Flink可以實現(xiàn)批流一體,感覺Flink可以很好的實現(xiàn)Kappa架構(gòu)。
數(shù)據(jù)的最終目的還是應(yīng)用,數(shù)據(jù)主要用于數(shù)據(jù)分析(報表,應(yīng)用系統(tǒng)等)、數(shù)據(jù)共享、業(yè)務(wù)創(chuàng)新(反哺業(yè)務(wù))、機器學(xué)習(xí)(預(yù)測、推薦系統(tǒng)等)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/130062.html
摘要:年月日日,由高可用架構(gòu)技術(shù)社區(qū)聯(lián)合麥思博有限公司共同主辦的全球互聯(lián)網(wǎng)架構(gòu)大會在上海光大會展中心成功舉行。至此,全球互聯(lián)網(wǎng)架構(gòu)大會完美落幕。 showImg(https://segmentfault.com/img/bV1mnC?w=800&h=533); 2017年12月22日-23日,由高可用架構(gòu)技術(shù)社區(qū)聯(lián)合麥思博(msup)有限公司共同主辦的 GIAC全球互聯(lián)網(wǎng)架構(gòu)大會在上海光大會...
摘要:年月日日,由高可用架構(gòu)技術(shù)社區(qū)聯(lián)合麥思博有限公司共同主辦的全球互聯(lián)網(wǎng)架構(gòu)大會在上海光大會展中心成功舉行。至此,全球互聯(lián)網(wǎng)架構(gòu)大會完美落幕。 showImg(https://segmentfault.com/img/bV1mnC?w=800&h=533); 2017年12月22日-23日,由高可用架構(gòu)技術(shù)社區(qū)聯(lián)合麥思博(msup)有限公司共同主辦的 GIAC全球互聯(lián)網(wǎng)架構(gòu)大會在上海光大會...
摘要:月日北京站在天使匯咖啡如期舉行。董偉以非常接地氣的方式介紹什么是云計算以及云計算平臺任務(wù)調(diào)度系統(tǒng),包括調(diào)度系統(tǒng)中有哪些任務(wù)和需求點以及框架設(shè)計。通過幾個維度介紹云計算平臺資源調(diào)度是如何進化等。 8 月 29 日 SegmentFault D-Day 北京站在天使匯 DotGeek 咖啡如期舉行。本次沙龍邀請到 API Cloud CTO 鄒達、QingCloud 高級工程師 Ray、靈...
摘要:北京時間月日月日,由和中國國際人才交流基金會聯(lián)合主辦的第七屆全球軟件案例研究峰會簡稱在北京國家會議中心圓滿落幕。本屆峰會,來自阿里美團百度平安銀行等企業(yè)的講師分別從企業(yè)轉(zhuǎn)型及研發(fā)效能方面分享敏捷和的實踐細(xì)節(jié)和操作經(jīng)驗。 北京時間11月30日-12月3日,由msup和中國國際人才交流基金會聯(lián)合主辦的第七屆全球軟件案例研究峰會(簡稱:TOP100summit)在北京國家會議中心圓滿落幕。T...
閱讀 1353·2023-01-11 13:20
閱讀 1699·2023-01-11 13:20
閱讀 1211·2023-01-11 13:20
閱讀 1902·2023-01-11 13:20
閱讀 4161·2023-01-11 13:20
閱讀 2749·2023-01-11 13:20
閱讀 1397·2023-01-11 13:20
閱讀 3664·2023-01-11 13:20