摘要:默認(rèn)情況下,當(dāng)數(shù)據(jù)元到達(dá)時(shí),分段接收器將按當(dāng)前系統(tǒng)時(shí)間拆分,并使用日期時(shí)間模式命名存儲(chǔ)區(qū)。如果需要,可以使用數(shù)據(jù)元或元組的屬性來確定目錄。這將調(diào)用傳入的數(shù)據(jù)元并將它們寫入部分文件,由換行符分隔。消費(fèi)者的消費(fèi)者被稱為或等。
1 概覽 1.1 預(yù)定義的源和接收器
Flink內(nèi)置了一些基本數(shù)據(jù)源和接收器,并且始終可用。該預(yù)定義的數(shù)據(jù)源包括文件,目錄和插socket,并從集合和迭代器攝取數(shù)據(jù)。該預(yù)定義的數(shù)據(jù)接收器支持寫入文件和標(biāo)準(zhǔn)輸入輸出及socket。
1.2 綁定連接器連接器提供用于與各種第三方系統(tǒng)連接的代碼。目前支持這些系統(tǒng):
Apache Kafka (source/sink)
Apache Cassandra (sink)
Amazon Kinesis Streams (source/sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache NiFi (source/sink)
Twitter Streaming API (source)
Google PubSub (source/sink)
要在應(yīng)用程序中使用其中一個(gè)連接器,通常需要其他第三方組件,例如數(shù)據(jù)存儲(chǔ)或消息隊(duì)列的服務(wù)器。
雖然本節(jié)中列出的流連接器是Flink項(xiàng)目的一部分,并且包含在源版本中,但它們不包含在二進(jìn)制分發(fā)版中。1.3 Apache Bahir中的連接器
Flink的其他流處理連接器正在通過Apache Bahir發(fā)布,包括:
Apache ActiveMQ (source/sink)
Apache Flume (sink)
Redis (sink)
Akka (sink)
Netty (source)
1.4 其他連接到Flink的方法 1.4.1 通過異步I / O進(jìn)行數(shù)據(jù)渲染使用連接器不是將數(shù)據(jù)輸入和輸出Flink的唯一方法。一種常見的模式是在一個(gè)Map或多個(gè)FlatMap 中查詢外部數(shù)據(jù)庫或Web服務(wù)以渲染主數(shù)據(jù)流。
Flink提供了一個(gè)用于異步I / O的API, 以便更有效,更穩(wěn)健地進(jìn)行這種渲染。
1.4.2 可查詢狀態(tài)當(dāng)Flink應(yīng)用程序?qū)⒋罅繑?shù)據(jù)推送到外部數(shù)據(jù)存儲(chǔ)時(shí),這可能會(huì)成為I / O瓶頸。如果所涉及的數(shù)據(jù)具有比寫入更少的讀取,則更好的方法可以是外部應(yīng)用程序從Flink獲取所需的數(shù)據(jù)。在可查詢的狀態(tài)界面,允許通過Flink被管理的狀態(tài),按需要查詢支持這個(gè)。
2 HDFS連接器此連接器提供一個(gè)Sink,可將分區(qū)文件寫入任一Hadoop文件系統(tǒng)支持的文件系統(tǒng) 。
要使用此連接器,請將以下依賴項(xiàng)添加到項(xiàng)目中:
請注意,流連接器當(dāng)前不是二進(jìn)制發(fā)布的一部分2.1 Bucketing File Sink
可以配置分段行為以及寫入,但我們稍后會(huì)介紹。這是可以創(chuàng)建一個(gè)默認(rèn)情況下匯總到按時(shí)間拆分的滾動(dòng)文件的存儲(chǔ)槽的方法
Java
Scala
唯一必需的參數(shù)是存儲(chǔ)桶的基本路徑。可以通過指定自定義bucketer,寫入器和批量大小來進(jìn)一步配置接收器。
默認(rèn)情況下,當(dāng)數(shù)據(jù)元到達(dá)時(shí),分段接收器將按當(dāng)前系統(tǒng)時(shí)間拆分,并使用日期時(shí)間模式"yyyy-MM-dd--HH"命名存儲(chǔ)區(qū)。這種模式傳遞給 DateTimeFormatter使用當(dāng)前系統(tǒng)時(shí)間和JVM的默認(rèn)時(shí)區(qū)來形成存儲(chǔ)桶路徑。用戶還可以為bucketer指定時(shí)區(qū)以格式化存儲(chǔ)桶路徑。每當(dāng)遇到新日期時(shí),都會(huì)創(chuàng)建一個(gè)新存儲(chǔ)桶。
例如,如果有一個(gè)包含分鐘作為最精細(xì)粒度的模式,將每分鐘獲得一個(gè)新桶。每個(gè)存儲(chǔ)桶本身都是一個(gè)包含多個(gè)部分文件的目錄:接收器的每個(gè)并行實(shí)例將創(chuàng)建自己的部件文件,當(dāng)部件文件變得太大時(shí),接收器也會(huì)在其他文件旁邊創(chuàng)建新的部件文件。當(dāng)存儲(chǔ)桶變?yōu)榉腔顒?dòng)狀態(tài)時(shí),將刷新并關(guān)閉打開的部件文件。如果存儲(chǔ)桶最近未寫入,則視為非活動(dòng)狀態(tài)。默認(rèn)情況下,接收器每分鐘檢查一次非活動(dòng)存儲(chǔ)桶,并關(guān)閉任何超過一分鐘未寫入的存儲(chǔ)桶。setInactiveBucketCheckInterval()并 setInactiveBucketThreshold()在一個(gè)BucketingSink。
也可以通過指定自定義bucketer setBucketer()上BucketingSink。如果需要,bucketer可以使用數(shù)據(jù)元或元組的屬性來確定bucket目錄。
默認(rèn)編寫器是StringWriter。這將調(diào)用toString()傳入的數(shù)據(jù)元并將它們寫入部分文件,由換行符分隔。在a setWriter() 上指定自定義編寫器使用BucketingSink。如果要編寫Hadoop SequenceFiles,可以使用提供的 SequenceFileWriter,也可以配置為使用壓縮。
有兩個(gè)配置選項(xiàng)指定何時(shí)應(yīng)關(guān)閉零件文件并啟動(dòng)新零件文件:
通過設(shè)置批量大小(默認(rèn)部件文件大小為384 MB)
通過設(shè)置批次滾動(dòng)時(shí)間間隔(默認(rèn)滾動(dòng)間隔為Long.MAX_VALUE)
當(dāng)滿足這兩個(gè)條件中的任何一個(gè)時(shí),將啟動(dòng)新的部分文件。看如下例子:
Java
Scala
這將創(chuàng)建一個(gè)接收器,該接收器將寫入遵循此模式的存儲(chǔ)桶文件:
Java
生成結(jié)果
date-time是我們從日期/時(shí)間格式獲取的字符串
parallel-task是并行接收器實(shí)例的索引
count是由于批處理大小或批處理翻轉(zhuǎn)間隔而創(chuàng)建的部分文件的運(yùn)行數(shù)
然而這種方式創(chuàng)建了太多小文件,不適合HDFS!僅供娛樂!
3 Apache Kafka連接器 3.1 簡介此連接器提供對Apache Kafka服務(wù)的事件流的訪問。
Flink提供特殊的Kafka連接器,用于從/向Kafka主題讀取和寫入數(shù)據(jù)。Flink Kafka Consumer集成了Flink的檢查點(diǎn)機(jī)制,可提供一次性處理語義。為實(shí)現(xiàn)這一目標(biāo),F(xiàn)link并不完全依賴Kafka的消費(fèi)者群體偏移跟蹤,而是在內(nèi)部跟蹤和檢查這些偏移。
為用例和環(huán)境選擇一個(gè)包(maven artifact id)和類名。對于大多數(shù)用戶來說,F(xiàn)linkKafkaConsumer08(部分flink-connector-kafka)是合適的。
然后,導(dǎo)入maven項(xiàng)目中的連接器:
環(huán)境配置參考
3.2 ZooKeeper安裝及配置下載zk
http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.15.1.tar.gz
配置系統(tǒng)環(huán)境
修改配置數(shù)據(jù)存儲(chǔ)路徑
啟動(dòng)3.3 Kafka部署及測試假設(shè)你剛剛開始并且沒有現(xiàn)有的Kafka或ZooKeeper數(shù)據(jù)
由于Kafka控制臺(tái)腳本對于基于Unix和Windows的平臺(tái)不同,因此在Windows平臺(tái)上使用bin windows 而不是bin /,并將腳本擴(kuò)展名更改為.bat。Step 1:下載代碼
下載
解壓
配置環(huán)境變量
配置服務(wù)器屬性
修改日志存儲(chǔ)路徑
修改主機(jī)名
Step 2: 啟動(dòng)服務(wù)器Kafka使用ZooKeeper,因此如果還沒有ZooKeeper服務(wù)器,則需要先啟動(dòng)它。
后臺(tái)模式啟動(dòng)
Step 3: 創(chuàng)建一個(gè)主題創(chuàng)建topic
Step 4: 發(fā)送一些消息Kafka附帶一個(gè)命令行客戶端,它將從文件或標(biāo)準(zhǔn)輸入中獲取輸入,并將其作為消息發(fā)送到Kafka集群。 默認(rèn)情況下,每行將作為多帶帶的消息發(fā)送。
運(yùn)行生產(chǎn)者,然后在控制臺(tái)中鍵入一些消息以發(fā)送到服務(wù)器。
啟動(dòng)生產(chǎn)者
Step 5: 啟動(dòng)一個(gè)消費(fèi)者Kafka還有一個(gè)命令行使用者,它會(huì)將消息轉(zhuǎn)儲(chǔ)到標(biāo)準(zhǔn)輸出。
分屏,新建消費(fèi)端
在不同的終端中運(yùn)行上述每個(gè)命令,那么現(xiàn)在應(yīng)該能夠在生產(chǎn)者終端中鍵入消息并看到它們出現(xiàn)在消費(fèi)者終端中
所有命令行工具都有其他選項(xiàng); 運(yùn)行不帶參數(shù)的命令將顯示更詳細(xì)地記錄它們的使用信息。
3.4 Kafka 1.0.0+ Connector從Flink 1.7開始,有一個(gè)新的通用Kafka連接器,它不跟蹤特定的Kafka主要版本。 相反,它在Flink發(fā)布時(shí)跟蹤最新版本的Kafka。
如果您的Kafka代理版本是1.0.0或更高版本,則應(yīng)使用此Kafka連接器。 如果使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應(yīng)使用與代理版本對應(yīng)的連接器。
兼容性通過Kafka客戶端API和代理的兼容性保證,通用Kafka連接器與較舊和較新的Kafka代理兼容。 它與版本0.11.0或更高版本兼容,具體取決于所使用的功能。
將Kafka Connector從0.11遷移到通用(V1.10新增)要執(zhí)行遷移,請參閱升級(jí)作業(yè)和Flink版本指南和
在整個(gè)過程中使用Flink 1.9或更新版本。
不要同時(shí)升級(jí)Flink和操作符。
確保您作業(yè)中使用的Kafka Consumer和/或Kafka Producer分配了唯一標(biāo)識(shí)符(uid):
使用stop with savepoint功能獲取保存點(diǎn)(例如,使用stop --withSavepoint)CLI命令。
用法要使用通用Kafka連接器,請為其添加依賴關(guān)系:
然后實(shí)例化新源(FlinkKafkaConsumer)
Flink Kafka Consumer是一個(gè)流數(shù)據(jù)源,可以從Apache Kafka中提取并行數(shù)據(jù)流。 使用者可以在多個(gè)并行實(shí)例中運(yùn)行,每個(gè)實(shí)例都將從一個(gè)或多個(gè)Kafka分區(qū)中提取數(shù)據(jù)。
Flink Kafka Consumer參與了檢查點(diǎn),并保證在故障期間沒有數(shù)據(jù)丟失,并且計(jì)算處理元素“恰好一次”。(注意:這些保證自然會(huì)假設(shè)Kafka本身不會(huì)丟失任何數(shù)據(jù)。)
請注意,F(xiàn)link在內(nèi)部將偏移量作為其分布式檢查點(diǎn)的一部分進(jìn)行快照。 承諾給Kafka的抵消只是為了使外部的進(jìn)展觀與Flink對進(jìn)展的看法同步。 這樣,監(jiān)控和其他工作可以了解Flink Kafka消費(fèi)者在多大程度上消耗了一個(gè)主題。
和接收器(FlinkKafkaProducer)。
除了從模塊和類名中刪除特定的Kafka版本之外,API向后兼容Kafka 0.11連接器。
3.5 Kafka消費(fèi)者Flink的Kafka消費(fèi)者被稱為FlinkKafkaConsumer08(或09Kafka 0.9.0.x等)。它提供對一個(gè)或多個(gè)Kafka主題的訪問。
構(gòu)造函數(shù)接受以下參數(shù):
主題名稱/主題名稱列表
DeserializationSchema / KeyedDeserializationSchema用于反序列化來自Kafka的數(shù)據(jù)
Kafka消費(fèi)者的屬性。需要以下屬性:
“bootstrap.servers”(以逗號(hào)分隔的Kafka經(jīng)紀(jì)人名單)
“zookeeper.connect”(逗號(hào)分隔的Zookeeper服務(wù)器列表)(僅Kafka 0.8需要)
“group.id”消費(fèi)者群組的ID
上述程序注意配置ip主機(jī)映射
虛擬機(jī)hosts
本地機(jī)器 hosts
發(fā)送消息
運(yùn)行程序消費(fèi)消息
Example:
Java
Scala
The DeserializationSchemaFlink Kafka Consumer需要知道如何將Kafka中的二進(jìn)制數(shù)據(jù)轉(zhuǎn)換為Java / Scala對象。
在 DeserializationSchema允許用戶指定這樣的一個(gè)架構(gòu)。T deserialize(byte[] message) 為每個(gè)Kafka消息調(diào)用該方法,從Kafka傳遞值。
從它開始通常很有幫助AbstractDeserializationSchema,它負(fù)責(zé)將生成的Java / Scala類型描述為Flink的類型系統(tǒng)。實(shí)現(xiàn)vanilla的用戶DeserializationSchema需要自己實(shí)現(xiàn)該getProducedType(...)方法。
為了訪問Kafka消息的鍵和值,KeyedDeserializationSchema具有以下deserialize方法T deserialize(byte [] messageKey,byte [] message,String topic,int partition,long offset)。
為方便起見,F(xiàn)link提供以下模式:
TypeInformationSerializationSchema(和TypeInformationKeyValueSerializationSchema)創(chuàng)建基于Flink的模式TypeInformation。如果Flink編寫和讀取數(shù)據(jù),這將非常有用。此模式是其他通用序列化方法的高性能Flink替代方案。
JsonDeserializationSchema(和JSONKeyValueDeserializationSchema)將序列化的JSON轉(zhuǎn)換為ObjectNode對象,可以使用objectNode.get(“field”)作為(Int / String / ...)()從中訪問字段。KeyValue objectNode包含一個(gè)“key”和“value”字段,其中包含所有字段,以及一個(gè)可選的“元數(shù)據(jù)”字段,用于公開此消息的偏移量/分區(qū)/主題。
AvroDeserializationSchema它使用靜態(tài)提供的模式讀取使用Avro格式序列化的數(shù)據(jù)。它可以從Avro生成的類(AvroDeserializationSchema.forSpecific(...))中推斷出模式,也可以GenericRecords 使用手動(dòng)提供的模式(with AvroDeserializationSchema.forGeneric(...))。此反序列化架構(gòu)要求序列化記錄不包含嵌入式架構(gòu)。
還有一個(gè)可用的模式版本,可以在Confluent Schema Registry中查找編寫器的模式(用于編寫記錄的 模式)。使用這些反序列化模式記錄將使用從模式注冊表中檢索的模式進(jìn)行讀取,并轉(zhuǎn)換為靜態(tài)提供的模式(通過 ConfluentRegistryAvroDeserializationSchema.forGeneric(...)或ConfluentRegistryAvroDeserializationSchema.forSpecific(...))。
要使用此反序列化模式,必須添加以下附加依賴項(xiàng):
當(dāng)遇到因任何原因無法反序列化的損壞消息時(shí),有兩個(gè)選項(xiàng) - 從deserialize(...)方法中拋出異常將導(dǎo)致作業(yè)失敗并重新啟動(dòng),或者返回null以允許Flink Kafka使用者以靜默方式跳過損壞的消息。請注意,由于使用者的容錯(cuò)能力(請參閱下面的部分以獲取更多詳細(xì)信息),因此對損壞的消息執(zhí)行失敗將使消費(fèi)者嘗試再次反序列化消息。因此,如果反序列化仍然失敗,則消費(fèi)者將在該損壞的消息上進(jìn)入不間斷重啟和失敗循環(huán)。
3.6 Kafka生產(chǎn)者Flink的Kafka Producer被稱為FlinkKafkaProducer011(或010 對于Kafka 0.10.0.x版本。或者直接就是FlinkKafkaProducer,對于Kafka>=1.0.0的版本來說)。
它允許將記錄流寫入一個(gè)或多個(gè)Kafka主題。
自應(yīng)用Pro
確保啟動(dòng)端口
Pro端生產(chǎn)消息
消費(fèi)端接收
Example
Java
Scala
上面的示例演示了創(chuàng)建Flink Kafka Producer以將流寫入單個(gè)Kafka目標(biāo)主題的基本用法。對于更高級(jí)的用法,還有其他構(gòu)造函數(shù)變體允許提供以下內(nèi)容:
提供自定義屬性
生產(chǎn)者允許為內(nèi)部的KafkaProducer提供自定義屬性配置。
自定義分區(qū)程序
將記錄分配給特定分區(qū),可以為FlinkKafkaPartitioner構(gòu)造函數(shù)提供實(shí)現(xiàn)。將為流中的每個(gè)記錄調(diào)用此分區(qū)程序,以確定應(yīng)將記錄發(fā)送到的目標(biāo)主題的確切分區(qū)。
高級(jí)序列化模式
與消費(fèi)者類似,生產(chǎn)者還允許使用調(diào)用的高級(jí)序列化模式KeyedSerializationSchema,該模式允許多帶帶序列化鍵和值。它還允許覆蓋目標(biāo)主題,以便一個(gè)生產(chǎn)者實(shí)例可以將數(shù)據(jù)發(fā)送到多個(gè)主題。
3.8 Kafka消費(fèi)者開始位置配置Flink Kafka Consumer允許配置如何確定Kafka分區(qū)的起始位置。
Java
Scala
Flink Kafka Consumer的所有版本都具有上述明確的起始位置配置方法。
setStartFromGroupOffsets(默認(rèn)行為)
從group.idKafka代理(或Zookeeper for Kafka 0.8)中的消費(fèi)者組(在消費(fèi)者屬性中設(shè)置)提交的偏移量開始讀取分區(qū)。如果找不到分區(qū)的偏移量,auto.offset.reset將使用屬性中的設(shè)置。
setStartFromEarliest()/ setStartFromLatest()
從最早/最新記錄開始。在這些模式下,Kafka中的承諾偏移將被忽略,不會(huì)用作起始位置。
setStartFromTimestamp(long)
從指定的時(shí)間戳開始。對于每個(gè)分區(qū),時(shí)間戳大于或等于指定時(shí)間戳的記錄將用作起始位置。如果分區(qū)的最新記錄早于時(shí)間戳,則只會(huì)從最新記錄中讀取分區(qū)。在此模式下,Kafka中的已提交偏移將被忽略,不會(huì)用作起始位置。
還可以指定消費(fèi)者應(yīng)從每個(gè)分區(qū)開始的確切偏移量:
Java
Scala
上面的示例將使用者配置為從主題的分區(qū)0,1和2的指定偏移量開始myTopic。偏移值應(yīng)該是消費(fèi)者應(yīng)為每個(gè)分區(qū)讀取的下一條記錄。請注意,如果使用者需要讀取在提供的偏移量映射中沒有指定偏移量的分區(qū),則它將回退到setStartFromGroupOffsets()該特定分區(qū)的默認(rèn)組偏移行為(即)。
請注意,當(dāng)作業(yè)從故障中自動(dòng)恢復(fù)或使用保存點(diǎn)手動(dòng)恢復(fù)時(shí),這些起始位置配置方法不會(huì)影響起始位置。在恢復(fù)時(shí),每個(gè)Kafka分區(qū)的起始位置由存儲(chǔ)在保存點(diǎn)或檢查點(diǎn)中的偏移量確定。
3.9 Kafka生產(chǎn)者和容錯(cuò) Kafka 0.8在0.9之前,Kafka沒有提供任何機(jī)制來保證至少一次或恰好一次的語義。
Kafka 0.9和0.10啟用Flink的檢查點(diǎn)時(shí),F(xiàn)linkKafkaProducer09和FlinkKafkaProducer010 能提供至少一次傳輸保證。
除了開啟Flink的檢查點(diǎn),還應(yīng)該配置setter方法:
setLogFailuresOnly(boolean)
默認(rèn)為false。啟用此選項(xiàng)將使生產(chǎn)者僅記錄失敗日志而不是捕獲和重新拋出它們。這大體上就是計(jì)數(shù)已成功的記錄,即使它從未寫入目標(biāo)Kafka主題。這必須設(shè)為false對于確保 至少一次
setFlushOnCheckpoint(boolean)
默認(rèn)為true。啟用此函數(shù)后,F(xiàn)link的檢查點(diǎn)將在檢查點(diǎn)成功之前等待檢查點(diǎn)時(shí)的任何動(dòng)態(tài)記錄被Kafka確認(rèn)。這可確保檢查點(diǎn)之前的所有記錄都已寫入Kafka。必須開啟,對于確保 至少一次
總之,默認(rèn)情況下,Kafka生成器對版本0.9和0.10具有至少一次保證,即
setLogFailureOnly設(shè)置為false和setFlushOnCheckpoint設(shè)置為true。
默認(rèn)情況下,重試次數(shù)設(shè)置為“0”。這意味著當(dāng)setLogFailuresOnly設(shè)置為時(shí)false,生產(chǎn)者會(huì)立即失敗,包括Leader更改。Kafka >= 0.11
默認(rèn)情況下,該值設(shè)置為“0”,以避免重試導(dǎo)致目標(biāo)主題中出現(xiàn)重復(fù)消息。對于經(jīng)常更改代理的大多數(shù)生產(chǎn)環(huán)境,建議將重試次數(shù)設(shè)置為更高的值。
Kafka目前沒有生產(chǎn)者事務(wù),因此Flink在Kafka主題里無法保證恰好一次交付
啟用Flink的檢查點(diǎn)后,F(xiàn)linkKafkaProducer011
對于Kafka >= 1.0.0版本是FlinkKafkaProduce
可以提供準(zhǔn)確的一次交付保證。
除了啟用Flink的檢查點(diǎn),還可以通過將適當(dāng)?shù)恼Z義參數(shù)傳遞給FlinkKafkaProducer011,選擇三種不同的算子操作模式
Semantic.NONE
Flink啥都不保證。生成的記錄可能會(huì)丟失,也可能會(huì)重復(fù)。
Semantic.AT_LEAST_ONCE(默認(rèn)設(shè)置)
類似于setFlushOnCheckpoint(true)在 FlinkKafkaProducer010。這可以保證不會(huì)丟失任何記錄(盡管它們可以重復(fù))。
Semantic.EXACTLY_ONCE
使用Kafka事務(wù)提供恰好一次的語義。每當(dāng)您使用事務(wù)寫入Kafka時(shí),不要忘記為任何從Kafka消費(fèi)記錄的應(yīng)用程序設(shè)置所需的isolation.level(read_committed 或read_uncommitted- 后者為默認(rèn)值)。
注意事項(xiàng)Semantic.EXACTLY_ONCE 模式依賴于在從所述檢查點(diǎn)恢復(fù)之后提交在獲取檢查點(diǎn)之前啟動(dòng)的事務(wù)的能力。如果Flink應(yīng)用程序崩潰和完成重啟之間的時(shí)間較長,那么Kafka的事務(wù)超時(shí)將導(dǎo)致數(shù)據(jù)丟失(Kafka將自動(dòng)中止超過超時(shí)時(shí)間的事務(wù))。考慮到這一點(diǎn),請根據(jù)預(yù)期的停機(jī)時(shí)間適當(dāng)配置事務(wù)超時(shí)。
Kafka broker默認(rèn) transaction.max.timeout.ms 設(shè)置為15分鐘。此屬性不允許為生產(chǎn)者設(shè)置大于其值的事務(wù)超時(shí)。
FlinkKafkaProducer011默認(rèn)情況下,將_transaction.timeout.msproducer config_中的屬性設(shè)置為1小時(shí),因此_transaction.max.timeout.ms_在使用 Semantic.EXACTLY_ONCE 模式之前應(yīng)該增加 該屬性。
在_read_committed_模式中KafkaConsumer,任何未完成的事務(wù)(既不中止也不完成)將阻止來自給定Kafka主題的所有讀取超過任何未完成的事務(wù)。換言之,遵循以下事件順序:
用戶事務(wù)1開啟并寫記錄
用戶事務(wù)2開啟并寫了一些其他記錄
用戶提交事務(wù)2
即使事務(wù)2已經(jīng)提交了記錄,在事務(wù)1提交或中止之前,消費(fèi)者也不會(huì)看到它們。這有兩個(gè)含義:
首先,在Flink應(yīng)用程序的正常工作期間,用戶可以預(yù)期Kafka主題中生成的記錄的可見性會(huì)延遲,等于已完成檢查點(diǎn)之間的平均時(shí)間。
其次,在Flink應(yīng)用程序失敗的情況下,讀者將阻止此應(yīng)用程序編寫的主題,直到應(yīng)用程序重新啟動(dòng)或配置的事務(wù)超時(shí)時(shí)間過去為止。此注釋僅適用于有多個(gè)代理/應(yīng)用程序?qū)懭胪籏afka主題的情況。
Semantic.EXACTLY_ONCE 模式為每個(gè)FlinkKafkaProducer011實(shí)例使用固定大小的KafkaProducers池。每個(gè)檢查點(diǎn)使用其中一個(gè)生產(chǎn)者。如果并發(fā)檢查點(diǎn)的數(shù)量超過池大小,F(xiàn)linkKafkaProducer011 將引發(fā)異常并將使整個(gè)應(yīng)用程序失敗。請相應(yīng)地配置最大池大小和最大并發(fā)檢查點(diǎn)數(shù)。3.10 Kafka消費(fèi)者及其容錯(cuò)
Semantic.EXACTLY_ONCE 采取所有可能的措施,不要留下任何阻礙消費(fèi)者閱讀Kafka主題的延遲事務(wù),這是必要的。但是,如果Flink應(yīng)用程序在第一個(gè)檢查點(diǎn)之前失敗,則在重新啟動(dòng)此類應(yīng)用程序后,系統(tǒng)中沒有關(guān)于先前池大小的信息。因此,在第一個(gè)檢查點(diǎn)完成之前按比例縮小Flink應(yīng)用程序是不安全的 _FlinkKafkaProducer011.SAFE_SCALE_DOWN_FACTOR_。
啟用Flink的檢查點(diǎn)后,F(xiàn)link Kafka Consumer將使用主題中的記錄,并以一致的方式定期檢查其所有Kafka偏移以及其他 算子操作的狀態(tài)。如果作業(yè)失敗,F(xiàn)link會(huì)將流式程序恢復(fù)到最新檢查點(diǎn)的狀態(tài),并從存儲(chǔ)在檢查點(diǎn)中的偏移量開始重新使用來自Kafka的記錄。
因此,繪制檢查點(diǎn)的間隔定義了程序在發(fā)生故障時(shí)最多可以返回多少。
檢查點(diǎn)常用參數(shù) enableCheckpointing啟用流式傳輸作業(yè)的檢查點(diǎn)。 將定期快照流式數(shù)據(jù)流的分布式狀態(tài)。 如果發(fā)生故障,流數(shù)據(jù)流將從最新完成的檢查點(diǎn)重新啟動(dòng)。
該作業(yè)在給定的時(shí)間間隔內(nèi)定期繪制檢查點(diǎn)。 狀態(tài)將存儲(chǔ)在配置的狀態(tài)后端。
此刻未正確支持檢查點(diǎn)迭代流數(shù)據(jù)流。 如果“force”參數(shù)設(shè)置為true,則系統(tǒng)仍將執(zhí)行作業(yè)。setCheckpointingMode setCheckpointTimeout setMaxConcurrentCheckpoints
要使用容錯(cuò)的Kafka使用者,需要在運(yùn)行環(huán)境中啟用拓?fù)涞臋z查點(diǎn):
Scala
Java
另請注意,如果有足夠的處理插槽可用于重新啟動(dòng)拓?fù)洌瑒tFlink只能重新啟動(dòng)拓?fù)洹R虼耍绻負(fù)溆捎趤G失了TaskManager而失敗,那么之后仍然必須有足夠的可用插槽。YARN上的Flink支持自動(dòng)重啟丟失的YARN容器。
如果未啟用檢查點(diǎn),Kafka使用者將定期向Zookeeper提交偏移量。
參考Streaming Connectors
Kafka官方文檔
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/75588.html
摘要:從上面自定義的可以看到我們繼承的就是這個(gè)類,那么來了解一下一個(gè)抽象類,繼承自。該類的子類有三個(gè),兩個(gè)是抽象類,在此基礎(chǔ)上提供了更具體的實(shí)現(xiàn),另一個(gè)是。 showImg(https://segmentfault.com/img/remote/1460000016978900?w=1920&h=1641); 前言 在 《從0到1學(xué)習(xí)Flink》—— Data Source 介紹 文章中,我...
摘要:每個(gè)在簡潔性和表達(dá)性之間提供不同的權(quán)衡,并針對不同的用例。在這些中處理的數(shù)據(jù)類型在相應(yīng)的編程語言中表示為類。該是為中心的聲明性表,其可被動(dòng)態(tài)地改變的表表示流時(shí)。這種抽象在語義和表達(dá)方面類似于,但是將程序表示為查詢表達(dá)式。 1 意義 1.1 分層的 APIs & 抽象層次 Flink提供三層API。 每個(gè)API在簡潔性和表達(dá)性之間提供不同的權(quán)衡,并針對不同的用例。 showImg(ht...
摘要:基于開發(fā)指南如果基于進(jìn)行應(yīng)用開發(fā),需要在文件中加入如下配置注解注意修改的值,確保其符合您的應(yīng)用。應(yīng)用開發(fā)完成后,可以直接直接運(yùn)行方法,在本地進(jìn)行基本的測試。基于gradle開發(fā)指南如果基于gradle進(jìn)行應(yīng)用開發(fā),需要在build.gradle文件中加入如下配置:buildscript { repositories { jcenter() // this applie...
閱讀 879·2021-10-25 09:45
閱讀 3306·2021-09-22 14:58
閱讀 3861·2021-08-31 09:43
閱讀 925·2019-08-30 15:55
閱讀 926·2019-08-29 13:51
閱讀 1237·2019-08-29 13:02
閱讀 3493·2019-08-29 12:52
閱讀 1968·2019-08-26 13:27