摘要:前言前篇文章從到學習介紹介紹了,也介紹了自帶的,那么如何自定義自己的呢這篇文章將寫一個教大家將從的數據到中去。
前言
前篇文章 《從0到1學習Flink》—— Data Sink 介紹 介紹了 Flink Data Sink,也介紹了 Flink 自帶的 Sink,那么如何自定義自己的 Sink 呢?這篇文章將寫一個 demo 教大家將從 Kafka Source 的數據 Sink 到 MySQL 中去。
準備工作我們先來看下 Flink 從 Kafka topic 中獲取數據的 demo,首先你需要安裝好了 FLink 和 Kafka 。
運行啟動 Flink、Zookepeer、Kafka,
好了,都啟動了!
數據庫建表DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, `password` varchar(25) COLLATE utf8_bin DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;實體類
Student.java
package com.zhisheng.flink.model; /** * Desc: * weixin: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Student { public int id; public String name; public String password; public int age; public Student() { } public Student(int id, String name, String password, int age) { this.id = id; this.name = name; this.password = password; this.age = age; } @Override public String toString() { return "Student{" + "id=" + id + ", name="" + name + """ + ", password="" + password + """ + ", age=" + age + "}"; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }工具類
工具類往 kafka topic student 發送數據
import com.alibaba.fastjson.JSON; import com.zhisheng.flink.model.Metric; import com.zhisheng.flink.model.Student; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 往kafka中寫數據 * 可以使用這個main函數進行測試一下 * weixin: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class KafkaUtils2 { public static final String broker_list = "localhost:9092"; public static final String topic = "student"; //kafka topic 需要和 flink 程序用同一個 topic public static void writeToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducerSinkToMySQL(props); for (int i = 1; i <= 100; i++) { Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i); ProducerRecord record = new ProducerRecord (topic, null, null, JSON.toJSONString(student)); producer.send(record); System.out.println("發送數據: " + JSON.toJSONString(student)); } producer.flush(); } public static void main(String[] args) throws InterruptedException { writeToKafka(); } }
該類就是 Sink Function,繼承了 RichSinkFunction ,然后重寫了里面的方法。在 invoke 方法中將數據插入到 MySQL 中。
package com.zhisheng.flink.sink; import com.zhisheng.flink.model.Student; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * Desc: * weixin: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class SinkToMySQL extends RichSinkFunctionFlink 程序{ PreparedStatement ps; private Connection connection; /** * open() 方法中建立連接,這樣不用每次 invoke 的時候都要建立連接和釋放連接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //關閉連接和釋放資源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每條數據的插入都要調用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(Student value, Context context) throws Exception { //組裝數據,執行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.setString(3, value.getPassword()); ps.setInt(4, value.getAge()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456"); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage()); } return con; } }
這里的 source 是從 kafka 讀取數據的,然后 Flink 從 Kafka 讀取到數據(JSON)后用阿里 fastjson 來解析成 student 對象,然后在 addSink 中使用我們創建的 SinkToMySQL,這樣就可以把數據存儲到 MySQL 了。
package com.zhisheng.flink; import com.alibaba.fastjson.JSON; import com.zhisheng.flink.model.Student; import com.zhisheng.flink.sink.SinkToMySQL; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011; import java.util.Properties; /** * Desc: * weixin: zhisheng_tian * blog: http://www.54tianzhisheng.cn/ */ public class Main3 { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); SingleOutputStreamOperator結果student = env.addSource(new FlinkKafkaConsumer011<>( "student", //這個 kafka topic 需要和上面的工具類的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1) .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對象 student.addSink(new SinkToMySQL()); //數據 sink 到 mysql env.execute("Flink add sink"); } }
運行 Flink 程序,然后再運行 KafkaUtils2.java 工具類,這樣就可以了。
如果數據插入成功了,那么我們查看下我們的數據庫:
數據庫中已經插入了 100 條我們從 Kafka 發送的數據了。證明我們的 SinkToMySQL 起作用了。是不是很簡單?
項目結構怕大家不知道我的項目結構,這里發個截圖看下:
最后本文主要利用一個 demo,告訴大家如何自定義 Sink Function,將從 Kafka 的數據 Sink 到 MySQL 中,如果你項目中有其他的數據來源,你也可以換成對應的 Source,也有可能你的 Sink 是到其他的地方或者其他不同的方式,那么依舊是這個套路:繼承 RichSinkFunction 抽象類,重寫 invoke 方法。
關注我轉載請務必注明原創地址為:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/
另外我自己整理了些 Flink 的學習資料,目前已經全部放到微信公眾號了。你可以加我的微信:zhisheng_tian,然后回復關鍵字:Flink 即可無條件獲取到。
相關文章1、《從0到1學習Flink》—— Apache Flink 介紹
2、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境并構建運行簡單程序入門
3、《從0到1學習Flink》—— Flink 配置文件詳解
4、《從0到1學習Flink》—— Data Source 介紹
5、《從0到1學習Flink》—— 如何自定義 Data Source ?
6、《從0到1學習Flink》—— Data Sink 介紹
7、《從0到1學習Flink》—— 如何自定義 Data Sink ?
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/72128.html
摘要:從上圖可以看到接口有方法,它有一個抽象類。上面的那些自帶的可以看到都是繼承了抽象類,實現了其中的方法,那么我們要是自己定義自己的的話其實也是要按照這個套路來做的。 showImg(https://segmentfault.com/img/remote/1460000016956595); 前言 再上一篇文章中 《從0到1學習Flink》—— Data Source 介紹 講解了 Fli...
摘要:這些切片稱為窗口。函數允許對常規數據流進行分組。通常,這是非并行數據轉換,因為它在非分區數據流上運行。 showImg(https://segmentfault.com/img/remote/1460000017874226?w=1920&h=1271); 前言 在第一篇介紹 Flink 的文章 《《從0到1學習Flink》—— Apache Flink 介紹》 中就說過 Flink ...
閱讀 3331·2019-08-29 16:17
閱讀 1986·2019-08-29 15:31
閱讀 2656·2019-08-29 14:09
閱讀 2556·2019-08-26 13:52
閱讀 753·2019-08-26 12:21
閱讀 2150·2019-08-26 12:08
閱讀 1001·2019-08-23 17:08
閱讀 1934·2019-08-23 16:59