国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Flink數(shù)據(jù)廣播使用總結(jié)

IT那活兒 / 2472人閱讀
Flink數(shù)據(jù)廣播使用總結(jié)
一.為什么要使用廣播


在Flink使用場(chǎng)景中,常常需要獲取外部配置或規(guī)則數(shù)據(jù)后進(jìn)行計(jì)算或規(guī)則匹配,同時(shí)當(dāng)外部數(shù)據(jù)發(fā)生變化時(shí),F(xiàn)link中程序也要保持更新,有以下幾種方式可以實(shí)現(xiàn)


1.預(yù)加載

在算子的open()方法中讀取MySQL或其他存儲(chǔ)介質(zhì),獲取全量維表信息比如在算子RichMapFunction的open()方法中獲取全部數(shù)據(jù),然后在算子中進(jìn)行使用,這種方法的缺點(diǎn)是如果外部數(shù)據(jù)更新了Flink是沒法知道的,這就需要在開啟一個(gè)定時(shí)任務(wù)定時(shí)從MySQL中獲取最新的數(shù)據(jù)。


2.外部查詢

數(shù)據(jù)不需要存儲(chǔ),僅需要用到外部數(shù)據(jù)的時(shí)候去進(jìn)行查詢,可以保證查詢到的數(shù)據(jù)是最新的,但是對(duì)于吞吐量較高的場(chǎng)景,可能與外部(比如MySQL)交互就變成了 Flink任務(wù)的瓶頸,雖然可以設(shè)置為異步I/O的形式進(jìn)行交互優(yōu)化,但優(yōu)化程度一般有限。


3.本地緩存

需要設(shè)置過期時(shí)間或者定時(shí)更新數(shù)據(jù),當(dāng)數(shù)據(jù)到達(dá)過期時(shí)間后從新從外部獲取,或者定時(shí)從外部撈取數(shù)據(jù)進(jìn)行更新,不能在外部數(shù)據(jù)發(fā)生變動(dòng)時(shí),及時(shí)更新到Flink程序中。

預(yù)加載和本地緩存難以應(yīng)對(duì)當(dāng)外部數(shù)據(jù)發(fā)生變化時(shí),數(shù)據(jù)實(shí)時(shí)在Flink中保持更新。


二.什么是廣播


類似于全局性共享的數(shù)據(jù),詳見官方文檔

https://flink.apache.org/2019/06/26/broadcast-state.html

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/stream/state/broadcast_state.html


廣播的優(yōu)勢(shì)

廣播變量創(chuàng)建后,它可以運(yùn)行在集群中的任何function上,而不需要多次傳遞給集群節(jié)點(diǎn),可以直接在內(nèi)存中拿數(shù)據(jù),避免了大量的shuffle,導(dǎo)致集群性能下降。我們可以把一個(gè)dataset或者不變的緩存對(duì)象(例如maplist集合對(duì)象等)數(shù)據(jù)集廣播出去,然后不同的任務(wù)在節(jié)點(diǎn)上都能夠獲取到,并在每個(gè)節(jié)點(diǎn)上只會(huì)存在一份,而不是在每個(gè)并發(fā)線程中存在。

如果不使用broadcast,則在每個(gè)節(jié)點(diǎn)中的每個(gè)任務(wù)中都需要拷貝一份dataset數(shù)據(jù)集,比較浪費(fèi)內(nèi)存(也就是一個(gè)節(jié)點(diǎn)中可能會(huì)存在多份dataset數(shù)據(jù))。廣播變量,可以借助下圖輔助理解。


三.廣播的使用


根據(jù)廣播使用場(chǎng)景將廣播的類型分為廣播變量和廣播流(其實(shí)廣播原理是一樣的)。


1.廣播變量

將廣播的數(shù)據(jù)作為一個(gè)整體或?qū)ο髲V播,比如從MySQL中一次獲取全部數(shù)據(jù),然后廣播出去,因?yàn)閿?shù)據(jù)在MySql中,如果MySql中某條記錄發(fā)生變動(dòng),F(xiàn)link的souce是沒法知道,也不會(huì)廣播。所以只能在souce中定時(shí)從MySql中獲取全部數(shù)據(jù),然后廣播更新。

示例數(shù)據(jù)格式:

kafka源流數(shù)據(jù),只有itemid,沒有ip和port
{"host":"orcl", "itemid":"7875", "value":1}
{"host":"orcl2", "itemid":"7876", "value":2}
規(guī)則數(shù)據(jù)集在MySql,itemid關(guān)聯(lián)ip和port
itemid  ip              port
7875
192.168.199.105 1521
7876
192.168.199.106 1526

自定義MySql source, 定時(shí) 從Mysql獲取全部數(shù)據(jù)

@Override
public void run(SourceContext>> ctx) {
try {
 while (isRunning) {
  HashMap> output = new HashMap<>();
  ResultSet resultSet = preparedStatement.executeQuery();
  //每隔60s獲取全部外部數(shù)據(jù)集
  while (resultSet.next()) {
   String itemid = resultSet.getString("itemid");
   String ip = resultSet.getString("ip");
   int port = resultSet.getInt("port");
   output.put(itemid, new Tuple2<>(ip, port));
  }
  ctx.collect(output);
  Thread.sleep(1000 * 60);
  }
 } catch (Exception ex) {
  log.error("從Mysql獲取配置異常...", ex);
 }
}

廣播代碼實(shí)現(xiàn):

public void processElement(Map value, ReadOnlyContext ctx, Collector> out) throws Exception {
   //從廣播中獲取全量數(shù)據(jù)
  ReadOnlyBroadcastState>>      broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
 //獲取全部規(guī)則數(shù)據(jù)進(jìn)行匹配

Map>  itemrules=  broadcastState.get(null);
  //規(guī)則數(shù)據(jù)集為空跳過
    if(itemrules==null) {
  
  return;
    }
    //事件流中的itemid
    Object itemidObj = value.get("itemid"); // value kafka流中數(shù)據(jù)獲取itemid
if (itemidObj == null) {
 return;
}
    Tuple2 itemruld = itemrules.get(itemidObj.toString());
    if(itemruld!=null){
  
  //匹配成功增加ip,port字段
   
 value.put("ip", itemruld.f0);
   
 value.put("port", itemruld.f1);
        out.collect(value);
    }
}
@Override
public void processBroadcastElement(HashMap> value, Context ctx, Collector> out) throws Exception {
   //數(shù)據(jù)全部更新
BroadcastState>> broadcastState = ctx
  .getBroadcastState(ruleStateDescriptor);
//每次更新全部規(guī)則數(shù)據(jù)
broadcastState.put(null, value);
    System.out.println("規(guī)則全部更新成功,更新item規(guī)則:" + value);
}

執(zhí)行結(jié)果:

//itemid=7875關(guān)聯(lián)ip=192.168.199.104
{host=orcl, itemid=7875, value=1, ip=192.168.199.104, port=1521}
//手動(dòng)將mysql中的ip=192.168.199.104改為ip=192.168.199.105,在source 休眠結(jié)束后將會(huì)更新數(shù)據(jù)
規(guī)則更新成功,更新item規(guī)則:{7875=(192.168.199.105,1521), 7876=(192.168.199.106,1526)}
//itemid=7875關(guān)聯(lián)ip=192.168.199.105
{host=orcl, itemid=7875, value=1, ip=192.168.199.105, port=1521}

這種方式和預(yù)加載很像,都是通過定時(shí)任務(wù)加載全部數(shù)據(jù),只不過是方法的位置不同,一個(gè)是在自定義source中設(shè)置休眠時(shí)間,另外一個(gè)是在算子的open方法中設(shè)置定時(shí)任務(wù),廣播變量的方式同樣無法做到數(shù)據(jù)修改后實(shí)時(shí)更新。


2.廣播流

當(dāng)數(shù)據(jù)來源于kafka時(shí),F(xiàn)link消費(fèi)kafka獲取流,將流數(shù)據(jù)存儲(chǔ)在廣播狀態(tài)中,稱之為廣播流,不同于廣播變量一次獲取全部數(shù)據(jù),廣播流是kafka新增一條記錄就將這條記錄存儲(chǔ)到廣播中,那廣播流如何實(shí)現(xiàn)外部數(shù)據(jù)的新增和更新?

kafka源流數(shù)據(jù),只有itemid,沒有ip和port
{"host":"orcl", "itemid":"7875", "value":1}
{"host":"orcl2", "itemid":"7876", "value":2}
規(guī)則數(shù)據(jù)集在kafka,itemid關(guān)聯(lián)ip和port
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}

2.1 外部數(shù)據(jù)新增和修改記錄

// 廣播狀態(tài)底層結(jié)構(gòu)是Map結(jié)構(gòu)
//kafka中的數(shù)據(jù),flink消費(fèi)后存儲(chǔ)到廣播狀態(tài),在廣播狀態(tài)中以itemid為key進(jìn)行存儲(chǔ)
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}
//新增  往kafka寫入新記錄(key不相同),flink會(huì)持續(xù)消費(fèi)kafka并將數(shù)據(jù)通過Map的put()方法存入廣播狀態(tài)
//修改  往kafka寫入新記錄(key相同),put()方法覆蓋之前的這條記錄以達(dá)到更新的目的
//比如需要更新itemid的ip和port值,要求往kafka中寫入一條新數(shù)據(jù),比如更新itemid 7875的ip和port
{"itemid":"7875","ip":"192.168.199.105","port":1525}

2.2 刪除記錄

//kafka中的數(shù)據(jù),flink消費(fèi)后存儲(chǔ)到廣播狀態(tài),以itemid為key進(jìn)行存儲(chǔ)
{"itemid":"7875","ip":"192.168.199.104","port":1521}
{"itemid":"7876","ip":"192.168.199.106","port":1526}
//如果需要?jiǎng)h除某條記錄,往kafka中寫入帶有key的數(shù)據(jù)和刪除標(biāo)記即可
//比如刪除itemid為7875的記錄,要求往kafka中寫入一條新數(shù)據(jù),程序刪除廣播中itemid7875的記錄
{"itemid":"7875","isRemove":true}

由于消費(fèi)kafka流是實(shí)時(shí)的,kafka的新記錄會(huì)實(shí)時(shí)進(jìn)行消費(fèi),根據(jù)新記錄的內(nèi)容對(duì)廣播數(shù)據(jù)實(shí)時(shí)的進(jìn)行新增,修改或刪除

同時(shí)由于kafka中的數(shù)據(jù)是不可變的,當(dāng)程序需要重啟時(shí),只需從頭消費(fèi)kafka即可,由于具有冪等性,最終的廣播數(shù)據(jù)是不會(huì)變的。

示例代碼

//Flink消費(fèi)外部kafka規(guī)則數(shù)據(jù)作為流
FlinkKafkaConsumer ruleKafkaConsumer = new FlinkKafkaConsumer("topic",new ItemRuleEntiyPojoSchema(),properties);
DataStream ruleStream = env.addSource(ruleKafkaConsumer);

//廣播方法
@Override
public void processBroadcastElement(ItemRuleEntiy value,
 BroadcastProcessFunction, ItemRuleEntiy, Map>.Context ctx,
 Collector> out) throws Exception {
BroadcastState broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
if (StringUtils.isNoneBlank(value.getItemid())) {
 System.out.println("獲取到新的廣播規(guī)則:" + value);
 //相比廣播變量,這里每次只存一條規(guī)則,相同key則覆蓋修改
 broadcastState.put(value.getItemid(), value); // 存放數(shù)據(jù)到廣播
}
}
@Override
public void processElement(Map value,
 BroadcastProcessFunction, ItemRuleEntiy, Map>.ReadOnlyContext ctx,
 Collector> out) throws Exception {
ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
Object itemidObj = value.get("itemid"); // 源kafka流中數(shù)據(jù)獲取itemid
if (itemidObj == null) {
 return;
}
// 根據(jù)item從廣播數(shù)據(jù)中查找規(guī)則,能查到,則增加ip,port字段
ItemRuleEntiy itemRule = broadcastState.get(itemidObj.toString());
if (itemRule != null) { // 從廣播中撈取到數(shù)據(jù)時(shí)
 value.put("ip", itemRule.getIp());
 value.put("port", itemRule.getPort());
 out.collect(value);
}
}

執(zhí)行結(jié)果

//  所有廣播規(guī)則數(shù)據(jù):
7875={itemid=7875, ip=192.168.199.104, port=1521}
7876={itemid=7876, ip=192.168.199.106, port=1526}
//itemid=7875關(guān)聯(lián)ip=192.168.199.104
({host=orcl, itemid=7875,value=1, ip=192.168.199.104, port=1521},7875)
//kafka寫入{itemid=7875, ip=192.168.199.105, port=1521}
 獲取到新的廣播規(guī)則:{itemid=7875, ip=192.168.199.105, port=1521}
 //itemid=7875關(guān)聯(lián)ip=192.168.199.105
{host=orcl,itemid=7875, value=1, ip=192.168.199.105, port=1521},7875)



四.總結(jié)


通過kafka廣播流的方式最終實(shí)現(xiàn)了Flink與外部數(shù)據(jù)交互的實(shí)時(shí)更新,不僅是kafka,還有MQ,甚至文件格式都可以作為廣播流,廣播流要求數(shù)據(jù)不能從內(nèi)部更改(無法作為流消息被實(shí)時(shí)消費(fèi)),只能通過新增的方式進(jìn)行修改和刪除(新增記錄中key相同的表示覆蓋修改,帶key和刪除標(biāo)記的表示刪除)

相比表(mysql,oracle)只是結(jié)果的呈現(xiàn),日志(kafka或其它隊(duì)列)是一種帶有時(shí)間維度(或先后順序)信息的存儲(chǔ),可以說表是二維的,日志是三維的,通過日志可以復(fù)原每個(gè)時(shí)間點(diǎn)的表,但是表不能還原日志。

廣播作為一種流(流明顯帶有時(shí)間特性),所以當(dāng)不帶時(shí)間維度的表作為流時(shí),是沒法形成真正意義上的流,只能通過定時(shí)獲取表的全部數(shù)據(jù)作為偽流,流中每個(gè)時(shí)間點(diǎn)的數(shù)據(jù)也只能是全量表數(shù)據(jù),同時(shí)定時(shí)也就沒法做到實(shí)時(shí)獲取。只有帶時(shí)間維度的日志作為流時(shí),才能做到實(shí)時(shí)獲取,而且每次只獲取最新的一條記錄即可,不用每次獲取全部數(shù)據(jù)。

END



文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/130030.html

相關(guān)文章

  • 你公司到底需不需要引入實(shí)時(shí)計(jì)算引擎?

    摘要:再如通過處理流數(shù)據(jù)生成簡(jiǎn)單的報(bào)告,如五分鐘的窗口聚合數(shù)據(jù)平均值。復(fù)雜的事情還有在流數(shù)據(jù)中進(jìn)行數(shù)據(jù)多維度關(guān)聯(lián)聚合塞選,從而找到復(fù)雜事件中的根因。因?yàn)楦鞣N需求,也就造就了現(xiàn)在不斷出現(xiàn)實(shí)時(shí)計(jì)算框架,而下文我們將重磅介紹我們推薦的實(shí)時(shí)計(jì)算框架。 前言 先廣而告之,本文摘自本人《大數(shù)據(jù)重磅炸彈——實(shí)時(shí)計(jì)算框架 Flink》課程第二篇,內(nèi)容首發(fā)自我的知識(shí)星球,后面持續(xù)在星球里更新,這里做個(gè)預(yù)告,今...

    HackerShell 評(píng)論0 收藏0
  • 從0到1學(xué)習(xí)Flink》—— 你上傳的 jar 包藏到哪里去了?

    摘要:本篇文章連接是關(guān)注我微信公眾號(hào)另外我自己整理了些的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信,然后回復(fù)關(guān)鍵字即可無條件獲取到。 前言 寫這篇文章其實(shí)也是知識(shí)星球里面的一個(gè)小伙伴問了這樣一個(gè)問題: 通過 flink UI 儀表盤提交的 jar 是存儲(chǔ)在哪個(gè)目錄下? 這個(gè)問題其實(shí)我自己也有問過,但是自己因?yàn)樽约旱膯栴}沒有啥壓力也就沒深入去思考,現(xiàn)在可是知識(shí)星球的付費(fèi)小伙伴問的...

    trigkit4 評(píng)論0 收藏0
  • 《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門

    摘要:總結(jié)本文描述了如何在電腦上安裝,及運(yùn)行它。相關(guān)文章從到學(xué)習(xí)介紹從到學(xué)習(xí)上搭建環(huán)境并構(gòu)建運(yùn)行簡(jiǎn)單程序入門從到學(xué)習(xí)配置文件詳解從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義從到學(xué)習(xí)介紹從到學(xué)習(xí)如何自定義 showImg(https://segmentfault.com/img/remote/1460000016915923?w=1920&h=1275); 準(zhǔn)備工作 1、安裝查看 Java 的版本號(hào),推薦...

    zeyu 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<