摘要:已更新至,歸管了,因此也相應統一。本文不再適用及以上版本。字段類型會非常非常奇葩。。。。但是如果體積過于龐大,很容易導致特別是我們一般不會給配置過高的內存。第二個,是函數的返回值。對于而言,我們可以直接使用,來得到這個什么都沒有的東西。
Spark已更新至2.x,DataFrame歸DataSet管了,因此API也相應統一。本文不再適用2.0.0及以上版本。
DataFrame原生支持直接輸出到JDBC,但如果目標表有自增字段(比如id),那么DataFrame就不能直接進行寫入了。因為DataFrame.write().jdbc()要求DataFrame的schema與目標表的表結構必須完全一致(甚至字段順序都要一致),否則會拋異常,當然,如果你SaveMode選擇了Overwrite,那么Spark刪除你原有的表,然后根據DataFrame的Schema生成一個。。。。字段類型會非常非常奇葩。。。。
于是我們只能通過DataFrame.collect(),把整個DataFrame轉成List
翻看Spark的JDBC源碼,發現實際上是通過foreachPartition方法,在DataFrame每一個分區中,對每個Row的數據進行JDBC插入,那么為什么我們就不能直接用呢?
Spark JdbcUtils.scala部分源碼:
def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) { val dialect = JdbcDialects.get(url) val nullTypes: Array[Int] = df.schema.fields.map { field => dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse( field.dataType match { case IntegerType => java.sql.Types.INTEGER case LongType => java.sql.Types.BIGINT case DoubleType => java.sql.Types.DOUBLE case FloatType => java.sql.Types.REAL case ShortType => java.sql.Types.INTEGER case ByteType => java.sql.Types.INTEGER case BooleanType => java.sql.Types.BIT case StringType => java.sql.Types.CLOB case BinaryType => java.sql.Types.BLOB case TimestampType => java.sql.Types.TIMESTAMP case DateType => java.sql.Types.DATE case t: DecimalType => java.sql.Types.DECIMAL case _ => throw new IllegalArgumentException( s"Can"t translate null value for field $field") }) } val rddSchema = df.schema val driver: String = DriverRegistry.getDriverClassName(url) val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties) // ****************** here ****************** df.foreachPartition { iterator => savePartition(getConnection, table, iterator, rddSchema, nullTypes) } }
嗯。。。既然Scala能實現,那么作為他的爸爸,Java也應該能玩!
我們看看foreachPartition的方法原型:
def foreachPartition(f: Iterator[Row] => Unit)
又是函數式語言最愛的匿名函數。。。非常討厭寫lambda,所以我們還是實現個匿名類吧。要實現的抽象類為:
scala.runtime.AbstractFunction1
來玩耍一下吧!
df.foreachPartition(new AbstractFunction1, BoxedUnit>() { @Override public BoxedUnit apply(Iterator it) { while (it.hasNext()){ System.out.println(it.next().toString()); } return BoxedUnit.UNIT; } });
嗯,maven complete一下,spark-submit看看~
好勒~拋異常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前實現UDF的時候,UDF1/2/3/4...各接口,都extends Serializable,也就是說,在Spark運行期間,Driver會把UDF接口實現類序列化,并在Executor中反序列化,執行call方法。。。這就不難理解了,我們foreachPartition丟進去的類,也應該implements Serializable。這樣,我們就得自己搞一個繼承AbstractFunction1
import org.apache.spark.sql.Row; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { }
可是每次都要return BoxedUnit.UNIT 搞得太別扭了,沒一點Java的風格。
import org.apache.spark.sql.Row; import scala.collection.Iterator; import scala.runtime.AbstractFunction1; import scala.runtime.BoxedUnit; import java.io.Serializable; public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable { @Override public BoxedUnit apply(Iterator it) { call(it); return BoxedUnit.UNIT; } public abstract void call(Iterator
it); }
于是我們可以直接Override call方法,就可以用滿滿Java Style的代碼去玩耍了!
df.foreachPartition(new JavaForeachPartitionFunc() { @Override public void call(Iteratorit) { while (it.hasNext()){ System.out.println(it.next().toString()); } } });
注意!我們實現的匿名類的方法,實際上是在executor上執行的,所以println是輸出到executor機器的stdout上。這個我們可以通過Spark的web ui,點擊具體Application的Executor頁面去查看(調試用的虛擬機集群,手扶拖拉機一樣的配置,別吐槽了~)
至于foreach方法同理。只不過把Iterator
have fun~
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/65979.html
摘要:是中處理結構化數據的模塊。可以從很多數據源加載數據并構造得到,如結構化數據文件,中的表,外部數據庫,或者已有的。使用反射機制,推導包含指定類型對象的。這一功能應該優先于使用。隨后,將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和壓力。 Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不同,Spark SQL的接口提供了更多關于數據的結構信息...
摘要:本文發于我的個人博客知識點大全與實戰我正在大數據技術派和朋友們討論有趣的話題,你也來加入吧概述什么是是用于結構化數據處理的模塊。是最新的查詢起始點,實質上是和的組合,所以在和上可用的在上同樣是可以使用的。 關注公眾號:大數據技術派,回復資料,領取1000G資料。本文發于我的個人博客:Spark SQL知識點大全...
摘要:是最新的查詢起始點,實質上是和的組合,所以在和上可用的在上同樣是可以使用的。轉換為轉換為其實就是對的封裝,所以可以直接獲取內部的注意此時得到的存儲類型為是具有強類型的數據集合,需要提供對應的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結構化數據(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...
閱讀 1163·2021-11-24 09:38
閱讀 3610·2021-11-22 15:32
閱讀 3461·2019-08-30 15:54
閱讀 2574·2019-08-30 15:53
閱讀 1499·2019-08-30 15:52
閱讀 2541·2019-08-30 13:15
閱讀 1843·2019-08-29 12:21
閱讀 1405·2019-08-26 18:36