国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

flink學習系列--基礎知識(一)

Warren / 2952人閱讀

摘要:前言最近因公司業務需求,需要使用到大數據分析。提供的可用于處理無盡的數據流。類似于把一個記錄拆分成兩條三條甚至是四條記錄例如把一個字符串分割成一個字符數組。是一個聚合操作,如計數求和求平均等。實現把兩個流連成一個流。

前言

最近因公司業務需求,需要使用到大數據分析。選擇了flink,第一次聽說flink我也是很懵逼的狀態,不過一段時間下來有了一點心得,在這里和大家分享分享。有很多描述不準確的,大家多提提意見。

1.flink是什么,為什么要flink?

其實大數據框架有很多,比如Hadoop(批處理),Storm(流處理),Samza(流處理),Spark...但是我們選擇的是flink,為什么呢?因為flink是“流式批處理”,flink將每一項視作真正的數據流。Flink提供的DataStream API可用于處理無盡的數據流。Flink可配合使用的基本組件包括:

Stream(流)是指在系統中流轉的,永恒不變的無邊界數據集

Operator(操作方)是指針對數據流執行操作以產生其他數據流的功能

Source(源)是指數據流進入系統的入口點

Sink(槽)是指數據流離開Flink系統后進入到的位置,槽可以是數據庫或到其他系統的連接器

說了這么多,我們做一個簡單的demo來體驗一下flink:
假設我們在電商平臺,需要近實時(5min)統計(1h內)商品點擊量的前三名。然后實時展示出來。如果使用java,我們需要做一個定時任務,監聽商品點擊事件,然后每5min使用sql計算一下...如果數據量小,間隔時間比較長,還比較好,如果數據量大,間隔時間比較短...那服務器的壓力就會賊大...但是使用flink會怎么樣呢?先看下代碼(40幾W條數據從阿里淘寶獲取,github上):

/**

Created by liuliang

on 2019/5/24

*/
public class HotItems {

public static void main(String[] args) throws Exception {

    // 創建 execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 告訴系統按照 EventTime 處理
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 為了打印到控制臺的結果不亂序,配置全局的并發為1,改變并發對結果正確性沒有影響
    env.setParallelism(1);

    // URL fileUrl = HotItems.class.getClassLoader().getResource("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv");
    Path filePath = Path.fromLocalFile(new File("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv"));

    // 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo
    PojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);


    // 由于 Java 反射抽取出的字段順序是不確定的,需要顯式指定下文件中字段的順序
    String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
    // 創建 PojoCsvInputFormat
    PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);


    env
            // 創建數據源,得到 UserBehavior 類型的 DataStream
            .createInput(csvInput, pojoType)
            // 抽取出時間和生成 watermark
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
                @Override
                public long extractAscendingTimestamp(UserBehavior userBehavior) {
                    // 原始數據單位秒,將其轉成毫秒
                    return userBehavior.timestamp * 1000;
                }
            })
            // 過濾出只有點擊的數據
            .filter(new FilterFunction() {
                @Override
                public boolean filter(UserBehavior userBehavior) throws Exception {
                    // 過濾出只有點擊的數據
                    return userBehavior.behavior.equals("pv");
                }
            })
            .keyBy("itemId")
            .timeWindow(Time.minutes(60), Time.minutes(5))
            .aggregate(new CountAgg(), new WindowResultFunction())
            .keyBy("windowEnd")
            .process(new TopNHotItems(3))
            .print();

    env.execute("Hot Items Job");

}



/** 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串 */
public static class TopNHotItems extends KeyedProcessFunction {

    private final int topSize;

    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }

    // 用于存儲商品與點擊數的狀態,待收齊同一個窗口的數據后,再觸發 TopN 計算
    private ListState itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(
            ItemViewCount input,
            Context context,
            Collector collector) throws Exception {

        // 每條數據都保存到狀態中
        itemState.add(input);
        // 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬于windowEnd窗口的所有商品數據
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }

    @Override
    public void onTimer(
            long timestamp, OnTimerContext ctx, Collector out) throws Exception {
        // 獲取收到的所有商品點擊量
        List allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        // 提前清除狀態中的數據,釋放空間
        itemState.clear();
        // 按照點擊量從大到小排序
        allItems.sort(new Comparator() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 將排名信息格式化成 String, 便于打印
        StringBuilder result = new StringBuilder();
        result.append("====================================
");
        result.append("時間: ").append(new Timestamp(timestamp-1)).append("
");
        for (int i=0; i {

    @Override
    public void apply(
            Tuple key,  // 窗口的主鍵,即 itemId
            TimeWindow window,  // 窗口
            Iterable aggregateResult, // 聚合函數的結果,即 count 值
            Collector collector  // 輸出類型為 ItemViewCount
    ) throws Exception {
        Long itemId = ((Tuple1) key).f0;
        Long count = aggregateResult.iterator().next();
        collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
    }
}


/** COUNT 統計的聚合函數實現,每出現一條記錄加一 */
public static class CountAgg implements AggregateFunction {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
        return acc1 + acc2;
    }
}


/** 商品點擊量(窗口操作的輸出類型) */
public static class ItemViewCount {
    public long itemId;     // 商品ID
    public long windowEnd;  // 窗口結束時間戳
    public long viewCount;  // 商品的點擊量

    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
        ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        return result;
    }
}



/** 用戶行為數據結構 **/
public static class UserBehavior {
    public long userId;         // 用戶ID
    public long itemId;         // 商品ID
    public int categoryId;      // 商品類目ID
    public String behavior;     // 用戶行為, 包括("pv", "buy", "cart", "fav")
    public long timestamp;      // 行為發生的時間戳,單位秒
}

}

實時模擬的結果:

====================================
時間: 2017-11-26 09:05:00.0
No0:  商品ID=5051027  瀏覽量=3
No1:  商品ID=3493253  瀏覽量=3
No2:  商品ID=4261030  瀏覽量=3
====================================


====================================
時間: 2017-11-26 09:10:00.0
No0:  商品ID=812879  瀏覽量=5
No1:  商品ID=2600165  瀏覽量=4
No2:  商品ID=2828948  瀏覽量=4
====================================


====================================
時間: 2017-11-26 09:15:00.0
No0:  商品ID=812879  瀏覽量=7
No1:  商品ID=138964  瀏覽量=5
No2:  商品ID=4568476  瀏覽量=5
====================================


====================================
時間: 2017-11-26 09:20:00.0
No0:  商品ID=812879  瀏覽量=8
No1:  商品ID=2338453  瀏覽量=8
No2:  商品ID=2563440  瀏覽量=7
====================================

可以看到,我們用比較簡單的代碼,就實現了熱點TOP n的問題.可見flink使用起來還是很方便的(至少比java方便不少)。

2.flink這么強大?為甚?

從上一個例子里面,我們已經初步體會到了flink的方便之處。我想從一下幾個方面解釋一下:

支持多種窗口

支持table api 【第二講介紹】

exactly-once (正好一次) 【第二講介紹】

1. 支持多種窗口
1.1 關于flink窗口我手動畫了一個簡單的圖:

1.2flink窗口函數
窗口函數就是這四個:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction.(當然也可以自定義window)

3.flink工作流程?
dataSource ->  DataTransformation(*)  ->dataSink

3.1 登陸監控demo了解 dataSource和dataSink

    dataSource:
        基于本地集合的 source、基于文件的 source、基于網絡套接字的 source、自定義的 source
        自定義source:
        a:flink提供了很多定義好的sourceFunction 比如Kafka,RabbitMq,Mysql...
        b:StreamExecutionEnvironment.addSource(sourceFunction) 自己寫sourceFunction 
          (實現ParallelSourceFunction / RichParallelSourceFunction )
    dataSink:
        寫入文件、打印出來、寫入 socket 、自定義的 sink 
        自定義的sink 
        a:同理,dataSink提供了很多定義好的dataSink...
        b:自定義dataSink

3.2 DataTransformation(*)

    簡單的Transformation示意圖【圖2】
    Transformation:數據轉換的各種操作,有 Map / FlatMap / Filter / KeyBy / Reduce /
    Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,
    操作很多,可以將數據轉換計算成你想要的數據。
    hello-demo
    注【1】

4.flink在我們測試環境上集成的demo

    1:登陸異地監控 (講清楚架構關系)
    2:代理樹
    

5.flink怎么發布?web操作界面簡單介紹。

打jar包,設置參數(并發度,main函數等),上傳


注:
【1】
map就是做一些映射,比如我們把兩個字符串合并成一個字符串,把一個字符串拆成兩個或者三個字符串。
flatMap類似于把一個記錄拆分成兩條、三條、甚至是四條記錄,例如把一個字符串分割成一個字符數組。
Filter就類似于過濾。
keyBy就等效于SQL里的group by。
aggregate是一個聚合操作,如計數、求和、求平均等。
reduce就類似于MapReduce里的reduce。
join操作就有點類似于我們數據庫里面的join。
connect實現把兩個流連成一個流。
repartition是一個重新分區操作(還沒研究)。
project操作就類似于SQL里面的snacks(還沒研究)

【以上涉及到的代碼,我已經上傳到github上面:https://github.com/iamcrawler...】

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/74910.html

相關文章

  • flink學習系列--基礎知識學習(四)

    摘要:前言這一講將介紹一下序列化機制和過程函數。然而由于的類型擦除,自動提取并不是總是有效。開發者在自定義類上使用注解,隨后創建相應的并覆蓋方法。 前言 這一講將介紹一下序列化機制和過程函數(processfunction)。 序列化機制 使用 Flink 編寫處理邏輯時,新手總是容易被林林總總的概念所混淆: 為什么 Flink 有那么多的類型聲明方式? BasicTypeInfo.ST...

    piglei 評論0 收藏0
  • Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)

    摘要:由于配置流是從關系型數據庫中讀取,速度較慢,導致實時數據流流入數據的時候,配置信息還未發送,這樣會導致有些實時數據讀取不到配置信息。從數據庫中解析出來,再去統計近兩周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 學習項目代碼 https://github.com/zhisheng17/f...

    Dr_Noooo 評論0 收藏0
  • 取之開源,用之開源——深度剖析阿里巴巴對Apache Flink的優化與改進

    摘要:基于在阿里巴巴搭建的平臺于年正式上線,并從阿里巴巴的搜索和推薦這兩大場景開始實現。在經過一番調研之后,阿里巴巴實時計算認為是一個非常適合的選擇。接下來,我們聊聊阿里巴巴在層對又大刀闊斧地進行了哪些改進。 Apache Flink 概述 Apache Flink(以下簡稱Flink)是誕生于歐洲的一個大數據研究項目,原名StratoSphere。該項目是柏林工業大學的一個研究性項目,早期...

    YJNldm 評論0 收藏0

發表評論

0條評論

Warren

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<