摘要:需要注意的是和方法生成的觸發(fā)器是連續(xù)的而不是一次性的。其他的還有一次性觸發(fā)器將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與一起用可以實現(xiàn)每個數(shù)據(jù)到達后的分鐘進行處理,經(jīng)常用于全局窗口,可以用觸發(fā)器來設置停止條件。
本文參考Apache Beam官方編程手冊
可以結(jié)合官方的Mobile Game 代碼閱讀本文。
在默認情況下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同時也不設置自定義的觸發(fā)器,那么Beam會在所有數(shù)據(jù)都收集到之后才開始對數(shù)據(jù)進行處理。這通常只能適用于有限數(shù)據(jù)且對實時性要求不高的情況。當輸入為無限流數(shù)據(jù),我們可以
1)設置合適的窗口大小(根據(jù)時間戳),在窗口末端進行數(shù)據(jù)處理;
2)設置觸發(fā)器,當條件滿足時觸發(fā),進行數(shù)據(jù)處理;
3)同時設置窗口和觸發(fā)器。
時間戳說明:Beam的數(shù)據(jù)都是保存在PCollection中。當讀入數(shù)據(jù)時,PCollection為每個元素都自動生成一個內(nèi)置的時間戳,對于無限輸入,數(shù)據(jù)的時間戳不同。而對于有限輸入,由于是同時讀入,所有的元素的時間戳都是一樣的,這時候分窗是沒有意義的(都在一個窗口)。而我們可以手動為每個元素設置時間戳,通常采用數(shù)據(jù)中已有的時間屬性(比如日志中一般都會帶有事件時間)。可以在DoFn中為數(shù)據(jù)帶上時間戳,如:
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }窗口類型:
1)全局窗口
就是默認不分窗的情況。
apply(Windows.
2)固定時間大小窗口
最常見的分窗方式,按照時間戳把數(shù)據(jù)處理窗口分為固定長度。
apply(Windows.
3)滑動窗口
需要設置2個參數(shù),窗口大小和窗口產(chǎn)生周期。窗口之間有重疊,通常用于計算平均數(shù)的情況(暫沒用過)
4)會話窗口
一般用于相同key數(shù)據(jù)聚合,同一個key的數(shù)據(jù)之間時間間隔較大的會被分到不同的窗口。
**
水位線和超時數(shù)據(jù):**
當使用用戶自定義的時間戳時,先處理的數(shù)據(jù)并不總是時間戳較小的,有可能出現(xiàn)時間戳小的數(shù)據(jù)在后面才產(chǎn)生的情況。Beam通常會給窗口設定一個處理期限時間(圖中縱軸),當超過這個時間的數(shù)據(jù)被視為超時數(shù)據(jù),而這些期限時間的連線即水位線。
系統(tǒng)會根據(jù)實際情況進行預測生成水位線,在默認情況下不對超時數(shù)據(jù)進行處理,而我們可以通過設置觸發(fā)器對超時數(shù)據(jù)進行額外處理。
觸發(fā)器種類1)時間時間觸發(fā)器
根據(jù)時間戳進行觸發(fā)。
.triggering(AfterWatermark.pastEndOfWindow()//水位線到達時觸發(fā)一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位線之前,每次觸發(fā)后第一個數(shù)據(jù)來到之后的5分鐘時再觸發(fā) .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位線之后,每次觸發(fā)后第一個數(shù)據(jù)來到之后的10分鐘時再觸發(fā)
以上分別對水位線上中下的3種數(shù)據(jù)進行不同的處理。需要注意的是withEarlyFirings和withLateFirings方法生成的觸發(fā)器是連續(xù)的而不是一次性的。
2)處理時間觸發(fā)器
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,僅在第一個數(shù)據(jù)到達后的5分鐘時觸發(fā)一次。
3)數(shù)據(jù)驅(qū)動型觸發(fā)器
AfterPane.elementCountAtleast(XX)
當處理到XX個時觸發(fā)一次。需要注意的是當數(shù)據(jù)個數(shù)小于XX時永遠不會觸發(fā)數(shù)據(jù)處理。
4)混合觸發(fā)器
將多個觸發(fā)器混合起來,比如1)中的代碼就是3個觸發(fā)器混合。其他的還有
①Repeatedly.forever(一次性觸發(fā)器)
將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以實現(xiàn)每個數(shù)據(jù)到達后的5分鐘進行處理,經(jīng)常用于全局窗口,可以用orFinally(觸發(fā)器)來設置停止條件。
②AfterEach.inOrder(觸發(fā)器1,觸發(fā)器2...)
當觸發(fā)器1滿足后等待觸發(fā)器2...知道所有觸發(fā)器滿足后開始數(shù)據(jù)處理。
③AfterFirst(觸發(fā)器1,觸發(fā)器2..)和AfterAll(觸發(fā)器1,觸發(fā)器2..)
這2個分別為或,與的邏輯。
④orFinally
見①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]超時數(shù)據(jù)處理
.withAllowedLateness(Duration.XXXX(XXX))
可設置允許超時多長時間的數(shù)據(jù)。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67679.html
摘要:與用于與的轉(zhuǎn)換。其中方法返回的是在中的位置下標。對于設置了多個觸發(fā)器的,自動選擇最后一個觸發(fā)的結(jié)算結(jié)果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關(guān)key的元素聚合到一起,通常是形成一個Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對聚...
摘要:最近在用做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。其中如有錯漏,歡迎指出。即從一條數(shù)據(jù)中獲得時間戳,然后以的格式返回。丟棄掉中的附加信息使用這一設置時,得到的中的元素是的和組成的鍵值對。 最近在用Apache beam做流上的異常檢測,期間遇到了很多問題,但是發(fā)現(xiàn)網(wǎng)上相關(guān)的資料很少,基本只能自己啃文檔和瞎嘗試。所以想把自己踩過的坑記錄...
摘要:一直接訪問引入的相關(guān)包使用代替給指定配置與訪問本地文件一樣訪問文件實際測試中發(fā)現(xiàn)本地如能夠成功讀寫,但是集群模式下如讀寫失敗,原因未知。二通過訪問除了直接讀寫的數(shù)據(jù),還可以通過來進行讀寫。 一、直接訪問 1.引入HDFS的相關(guān)jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...
摘要:要說在中常見的函數(shù)是哪一個,當然是。是一個實現(xiàn)了接口的抽象類,其中是數(shù)據(jù)處理方法,強制子類必須實現(xiàn)。以上為學習一天的總結(jié),有錯誤歡迎指正。相同的是這個方法處理的都是中的一個元素。 在閱讀本文前,可先看一下官方的WordCount代碼, 對Apache Beam有大概的了解。 要說在Apache Beam中常見的函數(shù)是哪一個,當然是apply()。常見的寫法如下: [Final Outp...
摘要:主頁暫時下線社區(qū)暫時下線知識庫自媒體平臺微博知乎簡書博客園我們不是的官方組織機構(gòu)團體,只是技術(shù)棧以及的愛好者合作侵權(quán),請聯(lián)系請抄送一份到基礎(chǔ)編程思想和大數(shù)據(jù)中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔區(qū)塊 【主頁】 apachecn.org 【Github】@ApacheCN 暫時下線: 社區(qū) 暫時下線: cwiki 知識庫 自媒體平臺 ...
閱讀 1232·2021-11-11 16:54
閱讀 883·2021-10-19 11:44
閱讀 1348·2021-09-22 15:18
閱讀 2455·2019-08-29 16:26
閱讀 2958·2019-08-29 13:57
閱讀 3102·2019-08-26 13:32
閱讀 1090·2019-08-26 11:58
閱讀 2339·2019-08-26 10:37