摘要:要說(shuō)在中常見的函數(shù)是哪一個(gè),當(dāng)然是。是一個(gè)實(shí)現(xiàn)了接口的抽象類,其中是數(shù)據(jù)處理方法,強(qiáng)制子類必須實(shí)現(xiàn)。以上為學(xué)習(xí)一天的總結(jié),有錯(cuò)誤歡迎指正。相同的是這個(gè)方法處理的都是中的一個(gè)元素。
在閱讀本文前,可先看一下官方的WordCount代碼, 對(duì)Apache Beam有大概的了解。
要說(shuō)在Apache Beam中常見的函數(shù)是哪一個(gè),當(dāng)然是apply()。常見的寫法如下:
[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform])
而在最簡(jiǎn)單的wordcount代碼中,就出現(xiàn)了許多種不同的傳入?yún)?shù)類型,除了輸入輸出的部分,還包括
1)使用ParDo.of():
.apply("ExtractWords-joe", ParDo.of(new DoFn() { @ProcessElement public void processElement(ProcessContext context) { System.out.println(context.element()+"~"); for (String word : context.element().split(" ")) { if (!word.isEmpty()) { //輸出到Output PCollection context.output(word); } } } }) )
2)使用MapElements.via():
.apply("FomatResults", MapElements.via(new SimpleFunction,String>() { @Override public String apply(KV input) { return input.getKey()+":"+input.getValue(); } }))
3)以及使用PTransform子類:
.apply(new CountWords()) public static class CountWords extends PTransform, PCollection >> { @Override public PCollection > expand(PCollection lines) { // Convert lines of text into individual words. PCollection words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection > wordCounts = words.apply(Count. perElement()); return wordCounts; } }
這么多種傳入方式到底有什么聯(lián)系?通過查看源碼可以看出apply函數(shù)的定義如下:
publicOutputT apply( String name, PTransform super PBegin, OutputT> root) { return begin().apply(name, root); }
傳入的參數(shù)為PTransform類對(duì)象,也就是這幾種傳入?yún)?shù)其實(shí)都是PTransform類的變形。
PTransform是一個(gè)實(shí)現(xiàn)了Serializable接口的抽象類,其中public abstract OutputT expand(InputT input); 是數(shù)據(jù)處理方法,強(qiáng)制子類必須實(shí)現(xiàn)。
因此第(3)種方式很容易理解,就是通過繼承PTransform并實(shí)現(xiàn)了expand方法定義了CountWords類,給apply方法傳遞了一個(gè)CountWords對(duì)象。
在第(2)種方式中,MapElements是PTransform的子類,實(shí)現(xiàn)了expand方法,其實(shí)現(xiàn)方式是調(diào)用@Nullable private final SimpleFunction
public staticMapElements via( final SimpleFunction fn) { return new MapElements<>(fn, null, fn.getClass()); }
傳入了一個(gè)SimpleFunction對(duì)象,SimpleFunction是一個(gè)必須實(shí)現(xiàn)public OutputT apply(InputT input) 方法的抽象類,用戶在該apply方法中實(shí)現(xiàn)數(shù)據(jù)處理。
所以這種方式的實(shí)現(xiàn)方式如下:
定義SimpleFunction的子類并實(shí)現(xiàn)其中的apply方法,將該子類的對(duì)象傳遞給MapElements.via()。
第(1)種方式中,ParDo.of()方法傳入一個(gè)DoFn對(duì)象, 返回一個(gè)SingleOutput對(duì)象:
public staticSingleOutput of(DoFn fn) { validate(fn); return new SingleOutput ( fn, Collections. >emptyList(), displayDataForFn(fn)); }
SingleOutput與MapElements類似,也是PTransform的子類,實(shí)現(xiàn)了expand方法,使用private final DoFn
而DoFn是一個(gè)抽象類,用戶必須實(shí)現(xiàn)其注解方法(存疑) public void processElement(ProcessContext c)。
所以這種方式的實(shí)現(xiàn)方式如下:
定義DoFn的子類并實(shí)現(xiàn)其中的processElement方法,將該子類的對(duì)象傳遞給ParDo.of()。
需要注意的是processElement方法與前2種方式不同,輸入和輸出數(shù)據(jù)都是在傳入?yún)?shù)ProcessContext c中,而不是通過return進(jìn)行傳遞。
以上為學(xué)習(xí)Apache Beam一天的總結(jié),有錯(cuò)誤歡迎指正。
**
Day2補(bǔ)充,3種方式的區(qū)別和聯(lián)系:**
1)MapElement.via(SimpleFunction)和PTransform
MapElements是PTransform的一個(gè)子類:
public class MapElements
extends PTransform
從泛型參數(shù)來(lái)看,PTransform處理的是PCollection,而MapElement處理的是PCollection中的一個(gè)元素,對(duì)比SimpleFunction的apply方法和PTransform的expand方法的實(shí)現(xiàn)方式得到驗(yàn)證。
2)MapElement.via(SimpleFunction)和ParDo.of(DoFn)
區(qū)別之前已經(jīng)說(shuō)過,DoFn的processElement方法的輸入和輸出都是從參數(shù)傳入,而SimpleFunction的apply方法從參數(shù)傳入輸入,從return傳出輸出。
相同的是這2個(gè)方法處理的都是PCollection中的一個(gè)元素。
查看MapElement的expand方法源碼:
@Override public PCollectionexpand(PCollection extends InputT> input) { checkNotNull(fn, "Must specify a function on MapElements using .via()"); return input.apply( "Map", ParDo.of( new DoFn () { @ProcessElement public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); } //部分代碼忽略 })); }
可以看出其實(shí)也是實(shí)現(xiàn)了DoFn的子類,在DoFn的processElement方法中調(diào)用SimpleFunction對(duì)象的apply方法進(jìn)行處理。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://m.specialneedsforspecialkids.com/yun/70467.html
摘要:與用于與的轉(zhuǎn)換。其中方法返回的是在中的位置下標(biāo)。對(duì)于設(shè)置了多個(gè)觸發(fā)器的,自動(dòng)選擇最后一個(gè)觸發(fā)的結(jié)算結(jié)果。其他不是線程安全的,一般建議處理方法是冪等的。 Combine與GroupByKey GroupByKey是把相關(guān)key的元素聚合到一起,通常是形成一個(gè)Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是對(duì)聚...
摘要:需要注意的是和方法生成的觸發(fā)器是連續(xù)的而不是一次性的。其他的還有一次性觸發(fā)器將一次性觸發(fā)器變?yōu)檫B續(xù)型觸發(fā)器,觸發(fā)后再次等待觸發(fā)。例如與一起用可以實(shí)現(xiàn)每個(gè)數(shù)據(jù)到達(dá)后的分鐘進(jìn)行處理,經(jīng)常用于全局窗口,可以用觸發(fā)器來(lái)設(shè)置停止條件。 本文參考Apache Beam官方編程手冊(cè) 可以結(jié)合官方的Mobile Game 代碼閱讀本文。 在默認(rèn)情況下,Apache Beam是不分窗的,也就是采用Gl...
摘要:首頁(yè)地址關(guān)于我們我們不是的官方組織機(jī)構(gòu)團(tuán)體,只是技術(shù)棧以及的愛好者基礎(chǔ)編程思想和大數(shù)據(jù)中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔區(qū)塊鏈中文文檔數(shù)學(xué)筆記線性代數(shù)筆記數(shù)據(jù)科學(xué)中文文檔中文文檔中文文檔課本計(jì)算 首頁(yè)地址:http://www.apachecn.org關(guān)于我們:http://www.apachecn.org/about 我們不是 Apach...
摘要:首頁(yè)地址關(guān)于我們我們不是的官方組織機(jī)構(gòu)團(tuán)體,只是技術(shù)棧以及的愛好者基礎(chǔ)編程思想和大數(shù)據(jù)中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔中文文檔區(qū)塊鏈中文文檔數(shù)學(xué)筆記線性代數(shù)筆記數(shù)據(jù)科學(xué)中文文檔中文文檔中文文檔課本計(jì)算 首頁(yè)地址:http://www.apachecn.org關(guān)于我們:http://www.apachecn.org/about 我們不是 Apach...
閱讀 2953·2023-04-26 01:32
閱讀 1548·2021-09-13 10:37
閱讀 2286·2019-08-30 15:56
閱讀 1678·2019-08-30 14:00
閱讀 3052·2019-08-30 12:44
閱讀 1969·2019-08-26 12:20
閱讀 1068·2019-08-23 16:29
閱讀 3233·2019-08-23 14:44