国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Spark Java使用DataFrame的foreach/foreachPartition

Jrain / 2356人閱讀

摘要:已更新至,歸管了,因此也相應統一。本文不再適用及以上版本。字段類型會非常非常奇葩。。。。但是如果體積過于龐大,很容易導致特別是我們一般不會給配置過高的內存。第二個,是函數的返回值。對于而言,我們可以直接使用,來得到這個什么都沒有的東西。

Spark已更新至2.x,DataFrame歸DataSet管了,因此API也相應統一。本文不再適用2.0.0及以上版本。

DataFrame原生支持直接輸出到JDBC,但如果目標表有自增字段(比如id),那么DataFrame就不能直接進行寫入了。因為DataFrame.write().jdbc()要求DataFrame的schema與目標表的表結構必須完全一致(甚至字段順序都要一致),否則會拋異常,當然,如果你SaveMode選擇了Overwrite,那么Spark刪除你原有的表,然后根據DataFrame的Schema生成一個。。。。字段類型會非常非常奇葩。。。。
于是我們只能通過DataFrame.collect(),把整個DataFrame轉成List到Driver上,然后通過原生的JDBC方法進行寫入。但是如果DataFrame體積過于龐大,很容易導致Driver OOM(特別是我們一般不會給Driver配置過高的內存)。這個問題真的很讓人糾結。
翻看Spark的JDBC源碼,發現實際上是通過foreachPartition方法,在DataFrame每一個分區中,對每個Row的數據進行JDBC插入,那么為什么我們就不能直接用呢?

Spark JdbcUtils.scala部分源碼:

  def saveTable(df: DataFrame,url: String,table: String,properties: Properties = new Properties()) {
    val dialect = JdbcDialects.get(url)
    val nullTypes: Array[Int] = df.schema.fields.map { field =>
      dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
        field.dataType match {
          case IntegerType => java.sql.Types.INTEGER
          case LongType => java.sql.Types.BIGINT
          case DoubleType => java.sql.Types.DOUBLE
          case FloatType => java.sql.Types.REAL
          case ShortType => java.sql.Types.INTEGER
          case ByteType => java.sql.Types.INTEGER
          case BooleanType => java.sql.Types.BIT
          case StringType => java.sql.Types.CLOB
          case BinaryType => java.sql.Types.BLOB
          case TimestampType => java.sql.Types.TIMESTAMP
          case DateType => java.sql.Types.DATE
          case t: DecimalType => java.sql.Types.DECIMAL
          case _ => throw new IllegalArgumentException(
            s"Can"t translate null value for field $field")
        })
    }

    val rddSchema = df.schema
    val driver: String = DriverRegistry.getDriverClassName(url)
    val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
    // ****************** here ****************** 
    df.foreachPartition { iterator =>
      savePartition(getConnection, table, iterator, rddSchema, nullTypes)
    }
  }
 

嗯。。。既然Scala能實現,那么作為他的爸爸,Java也應該能玩!
我們看看foreachPartition的方法原型:

def foreachPartition(f: Iterator[Row] => Unit)

又是函數式語言最愛的匿名函數。。。非常討厭寫lambda,所以我們還是實現個匿名類吧。要實現的抽象類為:
scala.runtime.AbstractFunction1,BoxedUnit> 兩個模板參數,第一個很直觀,就是Row的迭代器,作為函數的參數。第二個BoxedUnit,是函數的返回值。不熟悉Scala的可能會很困惑,其實這就是Scala的void。由于Scala函數式編程的特性,代碼塊的末尾必須返回點什么,于是他們就搞出了個unit來代替本應什么都沒有的void(解釋得可能不是很準確,我是這么理解的)。對于Java而言,我們可以直接使用BoxedUnit.UNIT,來得到這個“什么都沒有”的東西。
來玩耍一下吧!

df.foreachPartition(new AbstractFunction1, BoxedUnit>() {
    @Override
    public BoxedUnit apply(Iterator it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
        return BoxedUnit.UNIT;
    }
});

嗯,maven complete一下,spark-submit看看~
好勒~拋異常了
org.apache.spark.SparkException: Task not serializable
Task不能被序列化
嗯哼,想想之前實現UDF的時候,UDF1/2/3/4...各接口,都extends Serializable,也就是說,在Spark運行期間,Driver會把UDF接口實現類序列化,并在Executor中反序列化,執行call方法。。。這就不難理解了,我們foreachPartition丟進去的類,也應該implements Serializable。這樣,我們就得自己搞一個繼承AbstractFunction1, BoxedUnit>,又實現Serializable的抽象類,給我們這些匿名類去實現!

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
}

可是每次都要return BoxedUnit.UNIT 搞得太別扭了,沒一點Java的風格。

import org.apache.spark.sql.Row;
import scala.collection.Iterator;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.Serializable;

public abstract class JavaForeachPartitionFunc extends AbstractFunction1, BoxedUnit> implements Serializable {
    @Override
    public BoxedUnit apply(Iterator it) {
        call(it);
        return BoxedUnit.UNIT;
    }
    
    public abstract void call(Iterator it);
}

于是我們可以直接Override call方法,就可以用滿滿Java Style的代碼去玩耍了!

df.foreachPartition(new JavaForeachPartitionFunc() {
    @Override
    public void call(Iterator it) {
        while (it.hasNext()){
            System.out.println(it.next().toString());
        }
    }
});

注意!我們實現的匿名類的方法,實際上是在executor上執行的,所以println是輸出到executor機器的stdout上。這個我們可以通過Spark的web ui,點擊具體Application的Executor頁面去查看(調試用的虛擬機集群,手扶拖拉機一樣的配置,別吐槽了~)

至于foreach方法同理。只不過把Iterator 換成 Row。具體怎么搞,慢慢玩吧~~~
have fun~

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/65979.html

相關文章

  • Spark SQL學習筆記

    摘要:是中處理結構化數據的模塊。可以從很多數據源加載數據并構造得到,如結構化數據文件,中的表,外部數據庫,或者已有的。使用反射機制,推導包含指定類型對象的。這一功能應該優先于使用。隨后,將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和壓力。 Spark SQL是Spark中處理結構化數據的模塊。與基礎的Spark RDD API不同,Spark SQL的接口提供了更多關于數據的結構信息...

    qieangel2013 評論0 收藏0
  • Spark SQL知識點大全與實戰

    摘要:本文發于我的個人博客知識點大全與實戰我正在大數據技術派和朋友們討論有趣的話題,你也來加入吧概述什么是是用于結構化數據處理的模塊。是最新的查詢起始點,實質上是和的組合,所以在和上可用的在上同樣是可以使用的。 關注公眾號:大數據技術派,回復資料,領取1000G資料。本文發于我的個人博客:Spark SQL知識點大全...

    番茄西紅柿 評論0 收藏2637
  • Spark SQL知識點與實戰

    摘要:是最新的查詢起始點,實質上是和的組合,所以在和上可用的在上同樣是可以使用的。轉換為轉換為其實就是對的封裝,所以可以直接獲取內部的注意此時得到的存儲類型為是具有強類型的數據集合,需要提供對應的類型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于結構化數據(structured data)處理的Spark模塊。與基本的Spark RDD API不同,Sp...

    番茄西紅柿 評論0 收藏2637

發表評論

0條評論

Jrain

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<