摘要:從上圖可以看到接口有方法,它有一個(gè)抽象類。上面的那些自帶的可以看到都是繼承了抽象類,實(shí)現(xiàn)了其中的方法,那么我們要是自己定義自己的的話其實(shí)也是要按照這個(gè)套路來做的。
前言
再上一篇文章中 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 講解了 Flink Data Source ,那么這里就來講講 Flink Data Sink 吧。
首先 Sink 的意思是:
大概可以猜到了吧!Data sink 有點(diǎn)把數(shù)據(jù)存儲(chǔ)下來(落庫)的意思。
如上圖,Source 就是數(shù)據(jù)的來源,中間的 Compute 其實(shí)就是 Flink 干的事情,可以做一系列的操作,操作完后就把計(jì)算后的數(shù)據(jù)結(jié)果 Sink 到某個(gè)地方。(可以是 MySQL、ElasticSearch、Kafka、Cassandra 等)。這里我說下自己目前做告警這塊就是把 Compute 計(jì)算后的結(jié)果 Sink 直接告警出來了(發(fā)送告警消息到釘釘群、郵件、短信等),這個(gè) sink 的意思也不一定非得說成要把數(shù)據(jù)存儲(chǔ)到某個(gè)地方去。其實(shí)官網(wǎng)用的 Connector 來形容要去的地方更合適,這個(gè) Connector 可以有 MySQL、ElasticSearch、Kafka、Cassandra RabbitMQ 等。
Flink Data Sink前面文章 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 介紹了 Flink Data Source 有哪些,這里也看看 Flink Data Sink 支持的有哪些。
看下源碼有哪些呢?
可以看到有 Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等 Sink 的方式。
SinkFunction從上圖可以看到 SinkFunction 接口有 invoke 方法,它有一個(gè) RichSinkFunction 抽象類。
上面的那些自帶的 Sink 可以看到都是繼承了 RichSinkFunction 抽象類,實(shí)現(xiàn)了其中的方法,那么我們要是自己定義自己的 Sink 的話其實(shí)也是要按照這個(gè)套路來做的。
這里就拿個(gè)較為簡單的 PrintSinkFunction 源碼來講下:
@PublicEvolving public class PrintSinkFunctionextends RichSinkFunction { private static final long serialVersionUID = 1L; private static final boolean STD_OUT = false; private static final boolean STD_ERR = true; private boolean target; private transient PrintStream stream; private transient String prefix; /** * Instantiates a print sink function that prints to standard out. */ public PrintSinkFunction() {} /** * Instantiates a print sink function that prints to standard out. * * @param stdErr True, if the format should print to standard error instead of standard out. */ public PrintSinkFunction(boolean stdErr) { target = stdErr; } public void setTargetToStandardOut() { target = STD_OUT; } public void setTargetToStandardErr() { target = STD_ERR; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); // get the target stream stream = target == STD_OUT ? System.out : System.err; // set the prefix if we have a >1 parallelism prefix = (context.getNumberOfParallelSubtasks() > 1) ? ((context.getIndexOfThisSubtask() + 1) + "> ") : null; } @Override public void invoke(IN record) { if (prefix != null) { stream.println(prefix + record.toString()); } else { stream.println(record.toString()); } } @Override public void close() { this.stream = null; this.prefix = null; } @Override public String toString() { return "Print to " + (target == STD_OUT ? "System.out" : "System.err"); } }
可以看到它就是實(shí)現(xiàn)了 RichSinkFunction 抽象類,然后實(shí)現(xiàn)了 invoke 方法,這里 invoke 方法就是把記錄打印出來了就是,沒做其他的額外操作。
如何使用?SingleOutputStreamOperator.addSink(new PrintSinkFunction<>();
這樣就可以了,如果是其他的 Sink Function 的話需要換成對(duì)應(yīng)的。
使用這個(gè) Function 其效果就是打印從 Source 過來的數(shù)據(jù),和直接 Source.print() 效果一樣。
下篇文章我們將講解下如何自定義自己的 Sink Function,并使用一個(gè) demo 來教大家,讓大家知道這個(gè)套路,且能夠在自己工作中自定義自己需要的 Sink Function,來完成自己的工作需求。
最后本文主要講了下 Flink 的 Data Sink,并介紹了常見的 Data Sink,也看了下源碼的 SinkFunction,介紹了一個(gè)簡單的 Function 使用, 告訴了大家自定義 Sink Function 的套路,下篇文章帶大家寫個(gè)。
關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/10/29/flink-sink/
另外我自己整理了些 Flink 的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。你可以加我的微信:zhisheng_tian,然后回復(fù)關(guān)鍵字:Flink 即可無條件獲取到。
相關(guān)文章1、《從0到1學(xué)習(xí)Flink》—— Apache Flink 介紹
2、《從0到1學(xué)習(xí)Flink》—— Mac 上搭建 Flink 1.6.0 環(huán)境并構(gòu)建運(yùn)行簡單程序入門
3、《從0到1學(xué)習(xí)Flink》—— Flink 配置文件詳解
4、《從0到1學(xué)習(xí)Flink》—— Data Source 介紹
5、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Source ?
6、《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹
7、《從0到1學(xué)習(xí)Flink》—— 如何自定義 Data Sink ?
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72087.html
摘要:前言前篇文章從到學(xué)習(xí)介紹介紹了,也介紹了自帶的,那么如何自定義自己的呢這篇文章將寫一個(gè)教大家將從的數(shù)據(jù)到中去。 showImg(https://segmentfault.com/img/remote/1460000016990655?w=1920&h=1281); 前言 前篇文章 《從0到1學(xué)習(xí)Flink》—— Data Sink 介紹 介紹了 Flink Data Sink,也介紹...
摘要:從到學(xué)習(xí)介紹從到學(xué)習(xí)介紹其中包括了和的,后面我也講了下如何自定義自己的和。這個(gè)問題可是線上很容易遇到的關(guān)注我轉(zhuǎn)載請務(wù)必注明原創(chuàng)地址為微信公眾號(hào)另外我自己整理了些的學(xué)習(xí)資料,目前已經(jīng)全部放到微信公眾號(hào)了。 showImg(https://segmentfault.com/img/remote/1460000017935460?w=1280&h=853); 前言 前面 FLink 的文章中...
閱讀 2069·2021-09-22 15:43
閱讀 8734·2021-09-22 15:07
閱讀 1086·2021-09-03 10:28
閱讀 2059·2021-08-19 10:57
閱讀 1071·2020-01-08 12:18
閱讀 2978·2019-08-29 15:09
閱讀 1530·2019-08-29 14:05
閱讀 1645·2019-08-29 13:57