国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

入門教程 | 5分鐘從零構(gòu)建第一個 Flink 應(yīng)用

Mike617 / 715人閱讀

摘要:接著我們將數(shù)據(jù)流按照單詞字段即號索引字段做分組,這里可以簡單地使用方法,得到一個以單詞為的數(shù)據(jù)流。得到的結(jié)果數(shù)據(jù)流,將每秒輸出一次這秒內(nèi)每個單詞出現(xiàn)的次數(shù)。最后一件事就是將數(shù)據(jù)流打印到控制臺,并開始執(zhí)行最后的調(diào)用是啟動實際作業(yè)所必需的。

本文轉(zhuǎn)載自 Jark’s Blog ,作者伍翀(云邪),Apache Flink Committer,阿里巴巴高級開發(fā)工程師。 本文將從開發(fā)環(huán)境準備、創(chuàng)建 Maven 項目,編寫 Flink 程序、運行程序等方面講述如何迅速搭建第一個 Flink 應(yīng)用。 在本文中,我們將從零開始,教您如何構(gòu)建第一個 Flink 應(yīng)用程序。

開發(fā)環(huán)境準備

Flink 可以運行在 Linux, Max OS X, 或者是 Windows 上。為了開發(fā) Flink 應(yīng)用程序,在本地機器上需要有 Java 8.xmaven 環(huán)境。

如果有 Java 8 環(huán)境,運行下面的命令會輸出如下版本信息:

$ java -version
java version "1.8.0_65"
Java(TM) SE Runtime Environment (build 1.8.0_65-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.65-b01, mixed mode)
如果有 maven 環(huán)境,運行下面的命令會輸出如下版本信息:

$ mvn -version
Apache Maven 3.5.4 (1edded0938998edf8bf061f1ceb3cfdeccf443fe; 2018-06-18T02:33:14+08:00)
Maven home: /Users/wuchong/dev/maven
Java version: 1.8.0_65, vendor: Oracle Corporation, runtime: /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre
Default locale: zh_CN, platform encoding: UTF-8
OS name: "mac os x", version: "10.13.6", arch: "x86_64", family: "mac"
另外我們推薦使用 ItelliJ IDEA (社區(qū)免費版已夠用)作為 Flink 應(yīng)用程序的開發(fā) IDE。Eclipse 雖然也可以,但是 Eclipse 在 Scala 和 Java 混合型項目下會有些已知問題,所以不太推薦 Eclipse。下一章節(jié),我們會介紹如何創(chuàng)建一個 Flink 工程并將其導(dǎo)入 ItelliJ IDEA。
創(chuàng)建 Maven 項目

我們將使用 Flink Maven Archetype 來創(chuàng)建我們的項目結(jié)構(gòu)和一些初始的默認依賴。在你的工作目錄下,運行如下命令來創(chuàng)建項目:

mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-quickstart-java 
    -DarchetypeVersion=1.6.1 
    -DgroupId=my-flink-project 
    -DartifactId=my-flink-project 
    -Dversion=0.1 
    -Dpackage=myflink 
    -DinteractiveMode=false

你可以編輯上面的 groupId, artifactId, package 成你喜歡的路徑。使用上面的參數(shù),Maven 將自動為你創(chuàng)建如下所示的項目結(jié)構(gòu):

$ tree my-flink-project
my-flink-project
├── pom.xml
└── src
    └── main
        ├── java
        │   └── myflink
        │       ├── BatchJob.java
        │       └── StreamingJob.java
        └── resources
            └── log4j.properties

我們的 pom.xml 文件已經(jīng)包含了所需的 Flink 依賴,并且在 src/main/java 下有幾個示例程序框架。接下來我們將開始編寫第一個 Flink 程序。

編寫 Flink 程序

啟動 IntelliJ IDEA,選擇 "Import Project"(導(dǎo)入項目),選擇 my-flink-project 根目錄下的 pom.xml。根據(jù)引導(dǎo),完成項目導(dǎo)入。

在 src/main/java/myflink 下創(chuàng)建 SocketWindowWordCount.java 文件:

package myflink;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

  }
}

現(xiàn)在這程序還很基礎(chǔ),我們會一步步往里面填代碼。注意下文中我們不會將 import 語句也寫出來,因為 IDE會自動將他們添加上去。在本節(jié)末尾,我會將完整的代碼展示出來,如果你想跳過下面的步驟,可以直接將最后的完整代碼粘到編輯器中。

Flink 程序的第一步是創(chuàng)建一個 StreamExecutionEnvironment 。這是一個入口類,可以用來設(shè)置參數(shù)和創(chuàng)建數(shù)據(jù)源以及提交任務(wù)。所以讓我們把它添加到 main 函數(shù)中:

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

下一步我們將創(chuàng)建一個從本地端口號 9000 的 socket 中讀取數(shù)據(jù)的數(shù)據(jù)源:

DataStream text = env.socketTextStream("localhost", 9000, "
");

這創(chuàng)建了一個字符串類型的 DataStreamDataStream 是 Flink 中做流處理的核心 API,上面定義了非常多常見的操作(如,過濾、轉(zhuǎn)換、聚合、窗口、關(guān)聯(lián)等)。在本示例中,我們感興趣的是每個單詞在特定時間窗口中出現(xiàn)的次數(shù),比如說5秒窗口。為此,我們首先要將字符串?dāng)?shù)據(jù)解析成單詞和次數(shù)(使用Tuple2表示),第一個字段是單詞,第二個字段是次數(shù),次數(shù)初始值都設(shè)置成了1。我們實現(xiàn)了一個flatmap,因為一行數(shù)據(jù)中可能有多個單詞。

DataStream> wordCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        });

接著我們將數(shù)據(jù)流按照單詞字段(即0號索引字段)做分組,這里可以簡單地使用 keyBy(int index)方法,得到一個以單詞為 key 的Tuple2數(shù)據(jù)流。然后我們可以在流上指定想要的窗口,并根據(jù)窗口中的數(shù)據(jù)計算結(jié)果。在我們的例子中,我們想要每5秒聚合一次單詞數(shù),每個窗口都是從零開始統(tǒng)計的。

DataStream> windowCounts = wordCounts
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

第二個調(diào)用的 .timeWindow()指定我們想要5秒的翻滾窗口(Tumble)。第三個調(diào)用為每個key每個窗口指定了sum聚合函數(shù),在我們的例子中是按照次數(shù)字段(即1號索引字段)相加。得到的結(jié)果數(shù)據(jù)流,將每5秒輸出一次這5秒內(nèi)每個單詞出現(xiàn)的次數(shù)。

最后一件事就是將數(shù)據(jù)流打印到控制臺,并開始執(zhí)行:

windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");

最后的 env.execute調(diào)用是啟動實際Flink作業(yè)所必需的。所有算子操作(例如創(chuàng)建源、聚合、打印)只是構(gòu)建了內(nèi)部算子操作的圖形。只有在execute()被調(diào)用時才會在提交到集群上或本地計算機上執(zhí)行。

下面是完整的代碼,部分代碼經(jīng)過簡化(代碼在 GitHub 上也能訪問到):

package myflink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

  public static void main(String[] args) throws Exception {

    // 創(chuàng)建 execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 通過連接 socket 獲取輸入數(shù)據(jù),這里連接到本地9000端口,如果9000端口已被占用,請換一個端口
    DataStream text = env.socketTextStream("localhost", 9000, "
");

    // 解析數(shù)據(jù),按 word 分組,開窗,聚合
    DataStream> windowCounts = text
        .flatMap(new FlatMapFunction>() {
          @Override
          public void flatMap(String value, Collector> out) {
            for (String word : value.split("s")) {
              out.collect(Tuple2.of(word, 1));
            }
          }
        })
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

    // 將結(jié)果打印到控制臺,注意這里使用的是單線程打印,而非多線程
    windowCounts.print().setParallelism(1);

    env.execute("Socket Window WordCount");
  }
}
運行程序

要運行示例程序,首先我們在終端啟動 netcat 獲得輸入流:

nc -lk 9000

如果是 Windows 平臺,可以通過 nmap.org/ncat/ 安裝 ncat 然后運行:

ncat -lk 9000

然后直接運行SocketWindowWordCount的 main 方法。

只需要在 netcat 控制臺輸入單詞,就能在 SocketWindowWordCount 的輸出控制臺看到每個單詞的詞頻統(tǒng)計。如果想看到大于1的計數(shù),請在5秒內(nèi)反復(fù)鍵入相同的單詞。

Cheers !

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/6850.html

相關(guān)文章

  • 《從0到1學(xué)習(xí)Flink》—— 介紹Flink中的Stream Windows

    摘要:在每個事件上,觸發(fā)器都可以決定觸發(fā)即清除刪除窗口并丟棄其內(nèi)容,或者啟動并清除窗口。請注意,指定的觸發(fā)器不會添加其他觸發(fā)條件,但會替換當(dāng)前觸發(fā)器。結(jié)論對于現(xiàn)代流處理器來說,支持連續(xù)數(shù)據(jù)流上的各種類型的窗口是必不可少的。 showImg(https://segmentfault.com/img/remote/1460000017892799?w=1280&h=720); 前言 目前有許多數(shù)...

    jifei 評論0 收藏0
  • Flink入門

    摘要:簡介是一個面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計算平臺,提供支持流處理和批處理兩種類型應(yīng)用的功能。每一個數(shù)據(jù)流起始于一個或多個,并終止于一個或多個。 Flink簡介 Apache Flink 是一個面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計算平臺,提供支持流處理和批處理兩種類型應(yīng)用的功能。 Apache Flink的前身是柏林理工大學(xué)一個研究性項目,在2014被Apache孵化器...

    余學(xué)文 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<