国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

spark Dstreams-kafka數據源

IT那活兒 / 485人閱讀
spark Dstreams-kafka數據源

點擊上方“IT那活兒”,關注后了解更多內容,不管IT什么活兒,干就完了!!!


01


簡   介


Spark Streaming+Kafka集成在實際應用中是非常常見的,其中kafka需要是0.10.0版本及以上。Kafka 0.10的Spark Streaming集成提供了簡單的并行性、Kafka分區和Spark分區之間的1:1對應關系以及對偏移量和元數據的訪
但是,由于較新的集成使用了新的Kafka consumer API而不是簡單的API,因此在使用上存在顯著差異。


02


案例及說明


首先需要添加依賴:
Stream中的每條記錄是一個ConsumerRecord實體。如果Spark batch持續時間大于默認的Kafka心跳會話超時(30秒),請適當增加heartbeat.interval.ms和session.timeout.ms。
于大于5分鐘的批處理,這將需要在代理上更改group.max.session.timeout.ms。
新的Kafka消費API將把消息預取到緩沖區中。因此,出于性能原因,Spark integration將緩存的使用者保留在執行器上(而不是為每個批處理重新創建它們),并且更愿意在具有適當使用者的主機位置上調度分區,這一點很重要。
在大多數情況下,您應該使用LocationStrategies.PreferConsistent,如上所示。這將在可用的執行器之間均勻地分配分區。如果您的執行者與您的Kafka代理位于相同的主機上,請使用PreferBrokers,這將更傾向于在Kafka leader上為該分區安排分區。
最后,如果分區之間的負載有明顯的偏差,請使用PreferFixed。這允許您指定分區到主機的顯式映射(任何未指定的分區都將使用一致的位置)。
消費者的緩存的默認最大大小為64.如果您希望處理超過(64 *個執行程序數)Kafka分區,則可以通過spark.streaming.kafka.consumer.cache.maxCapacity更改此設置。
如果要禁用Kafka使用者的緩存,可以將spark.streaming.Kafka.consumer.cache.enabled設置為false。
緩存由topicpartition和group.id設置密鑰,因此對createDirectStream的每次調用使用多帶帶的group.id。
新的Kafka consumer API有許多不同的方法來指定主題,其中一些方法需要大量的對象實例化后設置。ConsumerStrategies提供了一個抽象,允許Spark即使在從檢查點重新啟動后也能獲得正確配置的使用者。
如上所示,Subscribe允許您訂閱固定的主題集合。SubscribePattern允許您使用正則表達式指定感興趣的主題。請注意,與0.8集成不同,使用Subscribe或SubscribePattern應該響應在運行流期間添加分區。最后,Assign允許您指定一個固定的分區集合這三種策略都有重載構造函數,允許您指定特定分區的起始偏移量。
如果你有一個更適合批處理的用例,那么可以創建RDD來定義偏移范圍:
獲取偏移量:
請注意,只有在createDirectStream結果上調用的第一個方法中,而不是在隨后的方法鏈中,才能成功地將類型轉換為HasOffsetRanges。
請注意,RDD分區和Kafka分區之間的一對一映射在任何洗牌或重新分區的方法(例如reduceByKey()或window())之后都不會保留。


03


偏移量管理


kafka在失敗情況下傳輸語義取決于偏移量的存儲方式和存儲時間,spark輸出操作至少一次,如果你想只有一次輸出,則必須在冪等輸出后存儲偏移量,或者在原子事務中與輸出一起存儲偏移量,通過這種集成,為了提高可靠性,您有三個選項來存儲偏移量。
1) Checkpoint
如果啟用checkpointing,偏移量將存儲在檢查點中。這很容易實現,但也有缺點。您的輸出操作必須是冪等的,因為您將得到重復的輸出;交易不是一種選擇。此外,如果應用程序代碼已更改,則無法從檢查點恢復。對于計劃的升級,您可以通過在舊代碼的同時運行新代碼來緩解這一問題(因為輸出無論如何都需要是冪等的,所以它們不應該沖突)。但對于需要更改代碼的計劃外故障,您將丟失數據,除非您有另一種方法來識別已知良好的起始偏移量。
2)kafka自己管理偏移量
Kafka有一個特殊的topic用來存儲偏移量,默認情況下,消費者會自動定期提交偏移量,但是這肯定不是你想要的,因為輪詢期間消息可能還未輸出,這就是上面的流示例將“enable.auto.commit”設置為false的原因,但是在知道輸出已存儲后,可以手動將偏移提交到Kafka,與檢查點相比,Kafka的好處在于無論應用程序代碼如何更改,它都是一個持久的存儲。然而,卡夫卡不是事務性的,所以您的輸出仍然必須是冪等的。
3)自定義存儲
對于支持事務的數據存儲,將偏移保存在與結果相同的事務中可以使兩者保持同步,即使在失敗的情況下也是如此。
如果在檢測重復或跳過的偏移量范圍時非常小心,則回滾事務可防止重復或丟失的消息影響結果。這給出了精確一次語義的等價物,甚至對于聚合產生的輸出也可以使用這種策略,聚合通常很難使其成為冪等的。



本文作者:潘宗昊

本文來源:IT那活兒(上海新炬王翦團隊)

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/129613.html

相關文章

  • Spark 快速入門

    摘要:數據科學任務主要是數據分析領域,數據科學家要負責分析數據并建模,具備統計預測建模機器學習等方面的經驗,以及一定的使用或語言進行編程的能力。監控運行時性能指標信息。 Spark Spark 背景 什么是 Spark 官網:http://spark.apache.org Spark是一種快速、通用、可擴展的大數據分析引擎,2009年誕生于加州大學伯克利分校AMPLab,2010年開源,20...

    wangshijun 評論0 收藏0
  • Spark SQL知識點與實戰

    摘要:是最新的查詢起始點,實質上是和的組合,所以在和上可用的在上同樣是可以使用的。轉換為轉換為其實就是對的封裝,所以可以直接獲取內部的注意此時得到的存儲類型為是具有強類型的數據集合,需要提供對應的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結構化數據(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...

    番茄西紅柿 評論0 收藏2637

發表評論

0條評論

IT那活兒

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<