之前項目一直用的Flink-1.72版本,大多數用的流api進行開發的需求,現在掃描漏洞的時候必須升級到Flink-1.12.0或Flink-1.11.3,所以直接升級到Flink-1.12.0,發現之前用的api(assignTimestampsAndWatermarks)被設置為廢棄了。
先來看看項目之前用的:
后來查資料發現Flink在1.11版本中為了實現水印的通用以及方便,對水印進行了重構。
新版本的Flink在類classDataStream
WatermarkStrategy接口繼承了接口TimestampAssignerSupplier
先看一下interfaceTimestampAssignerSupplier
是創建一個TimestampAssigner
有一個longextractTimestamp方法,作用是從Flink消費的記錄中抽取時間,既可以理解為我們如果要通過業務時間進行統計時,需要通過該方法對來提取記錄的業務時間。所以用到業務時間的話,一定要根據自己的業務場景對該方法進行具體的實現。否則Flink會提供一個默認的實現RecordTimestampAssigner<>()
而默認實現的內容也十分簡單,一起看一下,必須是記錄中已經注冊了時間屬性。
接下來interfaceWatermarkGeneratorSupplier
是返回一個WatermarkGenerator
提供了兩個水印發送的方式,接下來對這兩個方式進行說明:
onEvent:每條記錄進來都會調用一次這個方法,入參有3個,第一個是記錄,第二個是記錄攜帶的時間,如果注冊了時間就會有,第三個參數時水印發射器WatermarkOutputoutput,可以通過這個參數對水印進行發射,用戶可以根據自己的業務場景來編寫自己的水印生成以及發射邏輯。該方法的重點是每條記錄都會調用.
onPeriodicEmit: 該方法是Flink提供的一個定時器方法,每隔一段時間會調用此方法,入參是WatermarkOutputoutput,用戶可以通過這個方法每隔一段時間發送一次水印,當記錄數過多時,每條記錄都發送一次水印明顯不合適,也影響性能,此時可以通過這個方法進行水印的定時發送,而onEvent只記錄當前水印而選擇不發射出去。該方法的參數配置為env.getConfig().setAutoWatermarkInterval(300L),入參是毫秒數,表示隔多少毫秒向下游算子發送一次水印。
而WatermarkStrategy中也提供了一些常用的WatermarkGenerator
BoundedOutOfOrdernessWatermarks
使用方法也十分的簡單,提供的是一個靜態方法,只需直接調用即可
WatermarkStrategy.
最后結合項目的需求將原來的使用水印的地方改成如下了
類圖及FLINK水印算子簡要流程
先上類圖,方便理解
接著簡單介紹下流程
首先TimestampsAndWatermarksOperator算子會在open方法中初始化用戶定義的水印邏輯及方式,并且如果需要定時發送水印會,注冊一個定時器觸發水印定時發送。
當元素到達算子后會調用processElement(StreamRecord
方法很簡單,如果元素已經被注冊了時間,就直接獲取時間,或者設置為LONG.MIN_VALUE,然后根據用戶定義的timestampAssigner.extractTimestamp從記錄中抽取時間屬性,然后再將時間寫入元素中,最后調用用戶定義的watermarkGenerator.onEvent方法,根據用戶的邏輯選擇刷新水印以及是否發射水印。
上面初始化中提到了,如果需要定時發送水印,則會注冊一個定時器,而定時器的方法如下
通過onProcessingTime來觸發定時器的內容,而內容也十分簡單,先調用用戶定義的watermarkGenerator.onPeriodicEmit方法發送水印,然后獲取當前時間,最后注冊當前時間加水印定時發送間隔的定時觸發器,等待下次觸發該方法。
參考資料
https://zhuanlan.zhihu.com/p/158951593
https://blog.csdn.net/zhaoyuqiang/article/details/107453466
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/130018.html
摘要:由于配置流是從關系型數據庫中讀取,速度較慢,導致實時數據流流入數據的時候,配置信息還未發送,這樣會導致有些實時數據讀取不到配置信息。從數據庫中解析出來,再去統計近兩周占比。 Flink 學習 https://github.com/zhisheng17/flink-learning 麻煩路過的各位親給這個項目點個 star,太不易了,寫了這么多,算是對我堅持下來的一種鼓勵吧! showI...
摘要:在這種情況下,清除僅指窗口中的數據元,而不是窗口元數據。紫色圓圈表示流的數據元,這些數據元由某個鍵在這種情況下是用戶,用戶和用戶劃分。 0 相關源碼 掌握Flink中三種常用的Time處理方式,掌握Flink中滾動窗口以及滑動窗口的使用,了解Flink中的watermark。 Flink 在流處理工程中支持不同的時間概念。 1 處理時間(Processing time) 執行相應算子...
摘要:另外,將機制發揚光大,對有著非常好的支持。系統也注意到并討論了和的問題。總結本文分享了四本相關的書籍和一份領域相關的論文列表篇,涉及的設計,實現,故障恢復,彈性擴展等各方面。 前言 之前也分享了不少自己的文章,但是對于 Flink 來說,還是有不少新入門的朋友,這里給大家分享點 Flink 相關的資料(國外數據 pdf 和流處理相關的 Paper),期望可以幫你更好的理解 Flink。...
摘要:每小時窗口將包括在系統時鐘指示整個小時之間到達特定操作的所有事件。平行流中的水印水印是在源函數處生成的,或直接在源函數之后生成的。源函數的每個并行子任務通常獨立生成其水印。由于其輸入流更新其事件時間,因此操作員也是如此。 showImg(https://segmentfault.com/img/remote/1460000017877320?w=1280&h=857); 前言 Flin...
摘要:由于配置流是從關系型數據庫中讀取,速度較慢,導致實時數據流流入數據的時候,配置信息還未發送,這樣會導致有些實時數據讀取不到配置信息。從數據庫中解析出來,再去統計近兩周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 學習項目代碼 https://github.com/zhisheng17/f...
閱讀 1353·2023-01-11 13:20
閱讀 1699·2023-01-11 13:20
閱讀 1211·2023-01-11 13:20
閱讀 1902·2023-01-11 13:20
閱讀 4161·2023-01-11 13:20
閱讀 2751·2023-01-11 13:20
閱讀 1397·2023-01-11 13:20
閱讀 3664·2023-01-11 13:20