国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Spark SQL學習筆記

qieangel2013 / 2936人閱讀

摘要:是中處理結構化數(shù)據的模塊。可以從很多數(shù)據源加載數(shù)據并構造得到,如結構化數(shù)據文件,中的表,外部數(shù)據庫,或者已有的。使用反射機制,推導包含指定類型對象的。這一功能應該優(yōu)先于使用。隨后,將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和壓力。

Spark SQL是Spark中處理結構化數(shù)據的模塊。與基礎的Spark RDD API不同,Spark SQL的接口提供了更多關于數(shù)據的結構信息和計算任務的運行時信息。Spark SQL如今有了三種不同的API:SQL語句、DataFrame API和最新的Dataset API。
Spark SQL的一種用法是直接執(zhí)行SQL查詢語句,你可使用最基本的SQL語法,也可以選擇HiveQL語法。Spark SQL可以從已有的Hive中讀取數(shù)據。

DataFrame是一種分布式數(shù)據集合,每一條數(shù)據都由幾個命名字段組成。概念上來說,她和關系型數(shù)據庫的表 或者 R和Python中的data frame等價,DataFrame可以從很多數(shù)據源(sources)加載數(shù)據并構造得到,如:結構化數(shù)據文件,Hive中的表,外部數(shù)據庫,或者已有的RDD。

Dataset是Spark-1.6新增的一種API。Dataset想要把RDD的優(yōu)勢(強類型,可以使用lambda表達式函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的優(yōu)勢結合到一起。Dataset可以由JVM對象構建(constructed )得到,而后Dataset上可以使用各種transformation算子(map,flatMap,filter 等)。

入口:SQLContext與SparkSession

對于2.0版本以前,Spark SQL所有的功能入口都是SQLContext 類,及其子類。

val sc: SparkContext // 假設已經有一個 SparkContext 對象
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// 用于包含RDD到DataFrame隱式轉換操作
import sqlContext.implicits._

對于2.0版本以后,入口變成了SparkSession,使用SparkSession.builder()來構建

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

Spark2.0引入SparkSession的目的是內建支持Hive的一些特性,包括使用HiveQL查詢,訪問Hive UDFs,從Hive表中讀取數(shù)據等,使用這些你不需要已存在的Hive配置。而在此之前,你需要引入HiveContext的依賴,并使用HiveContext來支持這些特性。

DataFrame

DataFrame可以從很多數(shù)據源(sources)加載數(shù)據并構造得到,如:結構化數(shù)據文件,Hive中的表,外部數(shù)據庫,或者已有的RDD。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Spark2.0之后,DataFrame僅是Dataset of Rows(對于java和Scala是這樣).DataFrame提供了結構化數(shù)據的領域專用語言支持.

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的操作方法列表,請查看Dataset的api
Dataset還支持各種字符串,日期,數(shù)學等函數(shù),列表見這里

編程方式執(zhí)行SQL查詢
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View - 前面創(chuàng)建的TempView是與SparkSession相關的,隨著session結束而銷毀,如果你想跨多個Session共享,你需要使用Global Temporary View.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
Dataset

Dataset API和RDD類似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通信。如果這個編碼器和標準序列化都能把對象轉字節(jié),那么編碼器就可以根據代碼動態(tài)生成,并使用一種特殊數(shù)據格式,這種格式下的對象不需要反序列化回來,就能允許Spark進行操作,如過濾、排序、哈希等。

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder personEncoder = Encoders.bean(Person.class);
Dataset javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder integerEncoder = Encoders.INT();
Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset transformedDS = primitiveDS.map(new MapFunction() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
和RDD互操作

Spark SQL有兩種方法將RDD轉為DataFrame。

使用反射機制,推導包含指定類型對象RDD的schema。這種基于反射機制的方法使代碼更簡潔,而且如果你事先知道數(shù)據schema,推薦使用這種方式;

編程方式構建一個schema,然后應用到指定RDD上。這種方式更啰嗦,但如果你事先不知道數(shù)據有哪些字段,或者數(shù)據schema是運行時讀取進來的,那么你很可能需要用這種方式。

利用反射推導schema

Spark SQL支持自動轉換一個JavaBean的RDD為DataFrame. 目前,SparkSQL不支持包含Map域的JavaBean轉換。你可以創(chuàng)建一個實現(xiàn)了Serializable接口的JavaBean.

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

// Create an RDD of Person objects from a text file
JavaRDD peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(new Function() {
    @Override
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");
      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));
      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder stringEncoder = Encoders.STRING();
Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset teenagerNamesByFieldDF = teenagersDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getAs("name");
  }
}, stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
編程方式定義Schema

你可能需要按以下三個步驟,以編程方式的創(chuàng)建一個DataFrame:

從已有的RDD創(chuàng)建一個包含Row對象的RDD

用StructType創(chuàng)建一個schema,和步驟1中創(chuàng)建的RDD的結構相匹配

把得到的schema應用于包含Row對象的RDD,調用這個方法來實現(xiàn)這一步:SparkSession.createDataFrame

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create an RDD
JavaRDD peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD rowRDD = peopleRDD.map(new Function() {
  @Override
  public Row call(String record) throws Exception {
    String[] attributes = record.split(",");
    return RowFactory.create(attributes[0], attributes[1].trim());
  }
});

// Apply the schema to the RDD
Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset namesDS = results.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
Data Sources數(shù)據源

Spark SQL支持基于DataFrame操作一系列不同的數(shù)據源。DataFrame既可以當成一個普通RDD來操作,也可以將其注冊成一個臨時表來查詢。把DataFrame注冊為table之后,你就可以基于這個table執(zhí)行SQL語句了。本節(jié)將描述加載和保存數(shù)據的一些通用方法,包含了不同的Spark數(shù)據源
在最簡單的情況下,所有操作都會以默認類型數(shù)據源來加載數(shù)據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。

Dataset usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

你也可以手動指定數(shù)據源,并設置一些額外的選項參數(shù)。數(shù)據源可由其全名指定(如,org.apache.spark.sql.parquet),而對于內建支持的數(shù)據源,可以使用簡寫名(json, parquet, jdbc)。任意類型數(shù)據源創(chuàng)建的DataFrame都可以用下面這種語法轉成其他類型數(shù)據格式。

Dataset peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");

直接對文件使用SQL,Spark SQL還支持直接對文件使用SQL查詢,不需要用read方法把文件加載進來。

Dataset sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
保存模式

Save操作有一個可選參數(shù)SaveMode,用這個參數(shù)可以指定如何處理數(shù)據已經存在的情況。很重要的一點是,這些保存模式都沒有加鎖,所以其操作也不是原子性的。另外,如果使用Overwrite模式,實際操作是,先刪除數(shù)據,再寫新數(shù)據。

SaveMode.ErrorIfExists (default) "error" (default) (默認模式)從DataFrame向數(shù)據源保存數(shù)據時,如果數(shù)據已經存在,則拋異常。

SaveMode.Append "append" 如果數(shù)據或表已經存在,則將DataFrame的數(shù)據追加到已有數(shù)據的尾部。

SaveMode.Overwrite "overwrite" 如果數(shù)據或表已經存在,則用DataFrame數(shù)據覆蓋之。

SaveMode.Ignore "ignore" 如果數(shù)據已經存在,那就放棄保存DataFrame數(shù)據。這和SQL里CREATE TABLE IF NOT EXISTS有點類似。

保存到持久化表

DataFrame可以使用saveAsTable方法將內容持久化到Hive的metastore表中.默認情況下,saveAsTable會創(chuàng)建一個”managed table“,也就是說這個表數(shù)據的位置是由metastore控制的。同樣,如果刪除表,其數(shù)據也會同步刪除。

Parquet文件

Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,而且Parquet文件能夠自動保存原始數(shù)據的schema。寫Parquet文件的時候,所有的字段都會自動轉成nullable,以便向后兼容。

編程方式加載數(shù)據

Dataset peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset namesDS = namesDF.map(new MapFunction() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}, Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

其余關鍵特性:請看官方文檔

分區(qū)發(fā)現(xiàn)

Schema合并

Hive metastore Parquet table轉換

刷新元數(shù)據

配置

JSON數(shù)據集

Spark SQL在加載JSON數(shù)據的時候,可以自動推導其schema并返回 Dataset。用SparkSession.read().json()讀取一個包含String的RDD或者JSON文件,即可實現(xiàn)這一轉換。

注意,通常所說的json文件只是包含一些json數(shù)據的文件,而不是我們所需要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。因此,一個常規(guī)的多行json文件經常會加載失敗。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
List jsonData = Arrays.asList(
        "{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}");
JavaRDD anotherPeopleRDD =
        new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset anotherPeople = spark.read().json(anotherPeopleRDD);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+
Hive表

Spark SQL支持從Hive中讀寫數(shù)據.然而,Hive依賴項太多,所以沒有把Hive包含在默認的Spark發(fā)布包里。要支持Hive,需要把相關的jar包放到classpath中(注意是所有節(jié)點的).
配置文件hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) 放在conf/.
首先你需要初始化SparkSession,但是如果你沒有存在的Hive部署,仍然可以得到Hive支持。

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = "spark-warehouse";
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
spark.sql("LOAD DATA LOCAL INPATH "examples/src/main/resources/kv1.txt" INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DaraFrames are of type Row, which lets you to access each column by ordinal.
Dataset stringsDS = sqlDF.map(new MapFunction() {
  @Override
  public String call(Row row) throws Exception {
    return "Key: " + row.get(0) + ", Value: " + row.get(1);
  }
}, Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...

和不同版本的Hive Metastore交互: 略,請看官方文檔

用JDBC連接其他數(shù)據庫

Spark SQL也可以用JDBC訪問其他數(shù)據庫。這一功能應該優(yōu)先于使用JdbcRDD。因為它返回一個DataFrame,而DataFrame在Spark SQL中操作更簡單,且更容易和來自其他數(shù)據源的數(shù)據進行交互關聯(lián)。
首先,你需要在spark classpath中包含對應數(shù)據庫的JDBC driver,下面這行包括了用于訪問postgres的數(shù)據庫driver

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar 

遠程數(shù)據庫的表可以通過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。以下是選項列表:

url : 普通jdbc url

dbtable 需要讀取的JDBC表。注意,任何可以填在SQL的where子句中的東西,都可以填在這里。(既可以填完整的表名,也可填括號括起來的子查詢語句)

driver JDBC driver的類名。這個類必須在master和worker節(jié)點上都可用,這樣各個節(jié)點才能將driver注冊到JDBC的子系統(tǒng)中。

fetchsize JDBC fetch size,決定每次獲取多少行數(shù)據。默認為 1000.

isolationLevel 可選有NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE,默認為READ_UNCOMMITTED.

truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.

createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.

// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

疑難解答
JDBC driver class必須在所有client session或者executor上,對java的原生classloader可見。這是因為Java的DriverManager在打開一個連接之前,會做安全檢查,并忽略所有對原聲classloader不可見的driver。最簡單的一種方法,就是在所有worker節(jié)點上修改compute_classpath.sh,并包含你所需的driver jar包。一些數(shù)據庫,如H2,會把所有的名字轉大寫。對于這些數(shù)據庫,在Spark SQL中必須也使用大寫。

性能調整

內存緩存
Spark SQL可以通過調用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨后,Spark SQL將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來刪除內存中的table。

你還可以使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。

spark.sql.inMemoryColumnarStorage.compressed true 如果設置為true,Spark SQL將會根據數(shù)據統(tǒng)計信息,自動為每一列選擇多帶帶的壓縮編碼方式。

spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批量的大小。增大批量大小可以提高內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。
其他配置選項

以下選項同樣也可以用來給查詢任務調性能。不過這些選項在未來可能被放棄,因為spark將支持越來越多的自動優(yōu)化。

spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.

spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操作時,能夠作為廣播變量的最大table的大小。設置為-1,表示禁用廣播。注意,目前的元數(shù)據統(tǒng)計僅支持Hive metastore中的表,并且需要運行這個命令:ANALYSE TABLE COMPUTE STATISTICS noscan

spark.sql.shuffle.partitions 200 配置數(shù)據混洗(shuffle)時(join或者聚合操作),使用的分區(qū)數(shù)。

分布式SQL引擎

Spark SQL可以作為JDBC/ODBC或者命令行工具的分布式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就可以直接在Spark SQL中運行SQL查詢。
略。

使用Spark SQL命令行工具

Spark SQL CLI是一個很方便的工具,它可以用local mode運行hive metastore service,并且在命令行中執(zhí)行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通信。

用如下命令,在spark目錄下啟動一個Spark SQL CLI

./bin/spark-sql

Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你可以用這個命令查看完整的選項列表:./bin/spark-sql –help

聚合

內建的DataFrame函數(shù)提供了如count(), countDistinct(), avg(), max(), min()等常用的聚合操作,用戶也可以自定義一些聚合函數(shù)。
Untyped User-Defined Aggregate Functions

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  }
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Type-Safe User-Defined Aggregate Functions

import java.io.Serializable;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn averageSalary = myAverage.toColumn().name("average_salary");
Dataset result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
參考:

http://spark.apache.org/docs/...
http://ifeve.com/apache-spark/

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67290.html

相關文章

  • Spark 』1. spark 簡介

    摘要:原文鏈接簡介寫在前面本系列是綜合了自己在學習過程中的理解記錄對參考文章中的一些理解個人實踐過程中的一些心得而來。其次,本系列是基于目前最新的系列開始的,目前的更新速度很快,記錄一下版本好還是必要的。 原文鏈接:『 Spark 』1. spark 簡介 寫在前面 本系列是綜合了自己在學習spark過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫...

    G9YH 評論0 收藏0
  • 數(shù)據庫

    摘要:編輯大咖說閱讀字數(shù)用時分鐘內容摘要對于真正企業(yè)級應用,需要分布式數(shù)據庫具備什么樣的能力相比等分布式數(shù)據庫,他們條最佳性能優(yōu)化性能優(yōu)化索引與優(yōu)化關于索引與優(yōu)化的基礎知識匯總。 mysql 數(shù)據庫開發(fā)常見問題及優(yōu)化 這篇文章從庫表設計,慢 SQL 問題和誤操作、程序 bug 時怎么辦這三個問題展開。 一個小時學會 MySQL 數(shù)據庫 看到了一篇適合新手的 MySQL 入門教程,希望對想學 ...

    mengbo 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<