摘要:是中處理結構化數(shù)據的模塊。可以從很多數(shù)據源加載數(shù)據并構造得到,如結構化數(shù)據文件,中的表,外部數(shù)據庫,或者已有的。使用反射機制,推導包含指定類型對象的。這一功能應該優(yōu)先于使用。隨后,將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和壓力。
Spark SQL是Spark中處理結構化數(shù)據的模塊。與基礎的Spark RDD API不同,Spark SQL的接口提供了更多關于數(shù)據的結構信息和計算任務的運行時信息。Spark SQL如今有了三種不同的API:SQL語句、DataFrame API和最新的Dataset API。
Spark SQL的一種用法是直接執(zhí)行SQL查詢語句,你可使用最基本的SQL語法,也可以選擇HiveQL語法。Spark SQL可以從已有的Hive中讀取數(shù)據。
DataFrame是一種分布式數(shù)據集合,每一條數(shù)據都由幾個命名字段組成。概念上來說,她和關系型數(shù)據庫的表 或者 R和Python中的data frame等價,DataFrame可以從很多數(shù)據源(sources)加載數(shù)據并構造得到,如:結構化數(shù)據文件,Hive中的表,外部數(shù)據庫,或者已有的RDD。
Dataset是Spark-1.6新增的一種API。Dataset想要把RDD的優(yōu)勢(強類型,可以使用lambda表達式函數(shù))和Spark SQL的優(yōu)化執(zhí)行引擎的優(yōu)勢結合到一起。Dataset可以由JVM對象構建(constructed )得到,而后Dataset上可以使用各種transformation算子(map,flatMap,filter 等)。
入口:SQLContext與SparkSession對于2.0版本以前,Spark SQL所有的功能入口都是SQLContext 類,及其子類。
val sc: SparkContext // 假設已經有一個 SparkContext 對象 val sqlContext = new org.apache.spark.sql.SQLContext(sc) // 用于包含RDD到DataFrame隱式轉換操作 import sqlContext.implicits._
對于2.0版本以后,入口變成了SparkSession,使用SparkSession.builder()來構建
import org.apache.spark.sql.SparkSession; SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate();
Spark2.0引入SparkSession的目的是內建支持Hive的一些特性,包括使用HiveQL查詢,訪問Hive UDFs,從Hive表中讀取數(shù)據等,使用這些你不需要已存在的Hive配置。而在此之前,你需要引入HiveContext的依賴,并使用HiveContext來支持這些特性。
DataFrameDataFrame可以從很多數(shù)據源(sources)加載數(shù)據并構造得到,如:結構化數(shù)據文件,Hive中的表,外部數(shù)據庫,或者已有的RDD。
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; Datasetdf = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Spark2.0之后,DataFrame僅是Dataset of Rows(對于java和Scala是這樣).DataFrame提供了結構化數(shù)據的領域專用語言支持.
// col("...") is preferable to df.col("...") import static org.apache.spark.sql.functions.col; // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select(col("name"), col("age").plus(1)).show(); // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter(col("age").gt(21)).show(); // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show(); // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
完整的操作方法列表,請查看Dataset的api
Dataset還支持各種字符串,日期,數(shù)學等函數(shù),列表見這里
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people"); DatasetsqlDF = spark.sql("SELECT * FROM people"); sqlDF.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Global Temporary View - 前面創(chuàng)建的TempView是與SparkSession相關的,隨著session結束而銷毀,如果你想跨多個Session共享,你需要使用Global Temporary View.
// Register the DataFrame as a global temporary view df.createGlobalTempView("people"); // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+Dataset
Dataset API和RDD類似,不過Dataset不使用Java序列化或者Kryo,而是使用專用的編碼器(Encoder )來序列化對象和跨網絡傳輸通信。如果這個編碼器和標準序列化都能把對象轉字節(jié),那么編碼器就可以根據代碼動態(tài)生成,并使用一種特殊數(shù)據格式,這種格式下的對象不需要反序列化回來,就能允許Spark進行操作,如過濾、排序、哈希等。
import java.util.Arrays; import java.util.Collections; import java.io.Serializable; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; public static class Person implements Serializable { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } // Create an instance of a Bean class Person person = new Person(); person.setName("Andy"); person.setAge(32); // Encoders are created for Java beans Encoder和RDD互操作personEncoder = Encoders.bean(Person.class); Dataset javaBeanDS = spark.createDataset( Collections.singletonList(person), personEncoder ); javaBeanDS.show(); // +---+----+ // |age|name| // +---+----+ // | 32|Andy| // +---+----+ // Encoders for most common types are provided in class Encoders Encoder integerEncoder = Encoders.INT(); Dataset primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder); Dataset transformedDS = primitiveDS.map(new MapFunction () { @Override public Integer call(Integer value) throws Exception { return value + 1; } }, integerEncoder); transformedDS.collect(); // Returns [2, 3, 4] // DataFrames can be converted to a Dataset by providing a class. Mapping based on name String path = "examples/src/main/resources/people.json"; Dataset peopleDS = spark.read().json(path).as(personEncoder); peopleDS.show(); // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Spark SQL有兩種方法將RDD轉為DataFrame。
使用反射機制,推導包含指定類型對象RDD的schema。這種基于反射機制的方法使代碼更簡潔,而且如果你事先知道數(shù)據schema,推薦使用這種方式;
編程方式構建一個schema,然后應用到指定RDD上。這種方式更啰嗦,但如果你事先不知道數(shù)據有哪些字段,或者數(shù)據schema是運行時讀取進來的,那么你很可能需要用這種方式。
利用反射推導schemaSpark SQL支持自動轉換一個JavaBean的RDD為DataFrame. 目前,SparkSQL不支持包含Map域的JavaBean轉換。你可以創(chuàng)建一個實現(xiàn)了Serializable接口的JavaBean.
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; // Create an RDD of Person objects from a text file JavaRDD編程方式定義SchemapeopleRDD = spark.read() .textFile("examples/src/main/resources/people.txt") .javaRDD() .map(new Function () { @Override public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); // Apply a schema to an RDD of JavaBeans to get a DataFrame Dataset peopleDF = spark.createDataFrame(peopleRDD, Person.class); // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset
teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); // The columns of a row in the result can be accessed by field index Encoder
stringEncoder = Encoders.STRING(); Dataset teenagerNamesByIndexDF = teenagersDF.map(new MapFunction () { @Override public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }, stringEncoder); teenagerNamesByIndexDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name Dataset
teenagerNamesByFieldDF = teenagersDF.map(new MapFunction () { @Override public String call(Row row) throws Exception { return "Name: " + row.
getAs("name"); } }, stringEncoder); teenagerNamesByFieldDF.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
你可能需要按以下三個步驟,以編程方式的創(chuàng)建一個DataFrame:
從已有的RDD創(chuàng)建一個包含Row對象的RDD
用StructType創(chuàng)建一個schema,和步驟1中創(chuàng)建的RDD的結構相匹配
把得到的schema應用于包含Row對象的RDD,調用這個方法來實現(xiàn)這一步:SparkSession.createDataFrame
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; // Create an RDD JavaRDDData Sources數(shù)據源peopleRDD = spark.sparkContext() .textFile("examples/src/main/resources/people.txt", 1) .toJavaRDD(); // The schema is encoded in a string String schemaString = "name age"; // Generate the schema based on the string of schema List fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); // Convert records of the RDD (people) to Rows JavaRDD rowRDD = peopleRDD.map(new Function
() { @Override public Row call(String record) throws Exception { String[] attributes = record.split(","); return RowFactory.create(attributes[0], attributes[1].trim()); } }); // Apply the schema to the RDD Dataset peopleDataFrame = spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame peopleDataFrame.createOrReplaceTempView("people"); // SQL can be run over a temporary view created using DataFrames Dataset
results = spark.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name Dataset
namesDS = results.map(new MapFunction () { @Override public String call(Row row) throws Exception { return "Name: " + row.getString(0); } }, Encoders.STRING()); namesDS.show(); // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
Spark SQL支持基于DataFrame操作一系列不同的數(shù)據源。DataFrame既可以當成一個普通RDD來操作,也可以將其注冊成一個臨時表來查詢。把DataFrame注冊為table之后,你就可以基于這個table執(zhí)行SQL語句了。本節(jié)將描述加載和保存數(shù)據的一些通用方法,包含了不同的Spark數(shù)據源
在最簡單的情況下,所有操作都會以默認類型數(shù)據源來加載數(shù)據(默認是Parquet,除非修改了spark.sql.sources.default 配置)。
DatasetusersDF = spark.read().load("examples/src/main/resources/users.parquet"); usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
你也可以手動指定數(shù)據源,并設置一些額外的選項參數(shù)。數(shù)據源可由其全名指定(如,org.apache.spark.sql.parquet),而對于內建支持的數(shù)據源,可以使用簡寫名(json, parquet, jdbc)。任意類型數(shù)據源創(chuàng)建的DataFrame都可以用下面這種語法轉成其他類型數(shù)據格式。
DatasetpeopleDF = spark.read().format("json").load("examples/src/main/resources/people.json"); peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
直接對文件使用SQL,Spark SQL還支持直接對文件使用SQL查詢,不需要用read方法把文件加載進來。
Dataset保存模式sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
Save操作有一個可選參數(shù)SaveMode,用這個參數(shù)可以指定如何處理數(shù)據已經存在的情況。很重要的一點是,這些保存模式都沒有加鎖,所以其操作也不是原子性的。另外,如果使用Overwrite模式,實際操作是,先刪除數(shù)據,再寫新數(shù)據。
SaveMode.ErrorIfExists (default) "error" (default) (默認模式)從DataFrame向數(shù)據源保存數(shù)據時,如果數(shù)據已經存在,則拋異常。
SaveMode.Append "append" 如果數(shù)據或表已經存在,則將DataFrame的數(shù)據追加到已有數(shù)據的尾部。
SaveMode.Overwrite "overwrite" 如果數(shù)據或表已經存在,則用DataFrame數(shù)據覆蓋之。
SaveMode.Ignore "ignore" 如果數(shù)據已經存在,那就放棄保存DataFrame數(shù)據。這和SQL里CREATE TABLE IF NOT EXISTS有點類似。
保存到持久化表DataFrame可以使用saveAsTable方法將內容持久化到Hive的metastore表中.默認情況下,saveAsTable會創(chuàng)建一個”managed table“,也就是說這個表數(shù)據的位置是由metastore控制的。同樣,如果刪除表,其數(shù)據也會同步刪除。
Parquet文件Parquet 是一種流行的列式存儲格式。Spark SQL提供對Parquet文件的讀寫支持,而且Parquet文件能夠自動保存原始數(shù)據的schema。寫Parquet文件的時候,所有的字段都會自動轉成nullable,以便向后兼容。
編程方式加載數(shù)據
DatasetpeopleDF = spark.read().json("examples/src/main/resources/people.json"); // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write().parquet("people.parquet"); // Read in the Parquet file created above. // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset
parquetFileDF = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile"); Dataset
namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset
namesDS = namesDF.map(new MapFunction () { public String call(Row row) { return "Name: " + row.getString(0); } }, Encoders.STRING()); namesDS.show(); // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+
其余關鍵特性:請看官方文檔
分區(qū)發(fā)現(xiàn)
Schema合并
Hive metastore Parquet table轉換
刷新元數(shù)據
配置
JSON數(shù)據集Spark SQL在加載JSON數(shù)據的時候,可以自動推導其schema并返回 Dataset
注意,通常所說的json文件只是包含一些json數(shù)據的文件,而不是我們所需要的JSON格式文件。JSON格式文件必須每一行是一個獨立、完整的的JSON對象。因此,一個常規(guī)的多行json文件經常會加載失敗。
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files DatasetHive表people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method people.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); // SQL statements can be run by using the sql methods provided by spark Dataset
namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19"); namesDF.show(); // +------+ // | name| // +------+ // |Justin| // +------+ // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List
jsonData = Arrays.asList( "{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}"); JavaRDD anotherPeopleRDD = new JavaSparkContext(spark.sparkContext()).parallelize(jsonData); Dataset anotherPeople = spark.read().json(anotherPeopleRDD); anotherPeople.show(); // +---------------+----+ // | address|name| // +---------------+----+ // |[Columbus,Ohio]| Yin| // +---------------+----+
Spark SQL支持從Hive中讀寫數(shù)據.然而,Hive依賴項太多,所以沒有把Hive包含在默認的Spark發(fā)布包里。要支持Hive,需要把相關的jar包放到classpath中(注意是所有節(jié)點的).
配置文件hive-site.xml, core-site.xml (for security configuration), and hdfs-site.xml (for HDFS configuration) 放在conf/.
首先你需要初始化SparkSession,但是如果你沒有存在的Hive部署,仍然可以得到Hive支持。
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = "spark-warehouse"; SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("LOAD DATA LOCAL INPATH "examples/src/main/resources/kv1.txt" INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. DatasetsqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DaraFrames are of type Row, which lets you to access each column by ordinal. Dataset
stringsDS = sqlDF.map(new MapFunction () { @Override public String call(Row row) throws Exception { return "Key: " + row.get(0) + ", Value: " + row.get(1); } }, Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List
records = new ArrayList<>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ...
和不同版本的Hive Metastore交互: 略,請看官方文檔
用JDBC連接其他數(shù)據庫Spark SQL也可以用JDBC訪問其他數(shù)據庫。這一功能應該優(yōu)先于使用JdbcRDD。因為它返回一個DataFrame,而DataFrame在Spark SQL中操作更簡單,且更容易和來自其他數(shù)據源的數(shù)據進行交互關聯(lián)。
首先,你需要在spark classpath中包含對應數(shù)據庫的JDBC driver,下面這行包括了用于訪問postgres的數(shù)據庫driver
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
遠程數(shù)據庫的表可以通過Data Sources API,用DataFrame或者SparkSQL 臨時表來裝載。以下是選項列表:
url : 普通jdbc url
dbtable 需要讀取的JDBC表。注意,任何可以填在SQL的where子句中的東西,都可以填在這里。(既可以填完整的表名,也可填括號括起來的子查詢語句)
driver JDBC driver的類名。這個類必須在master和worker節(jié)點上都可用,這樣各個節(jié)點才能將driver注冊到JDBC的子系統(tǒng)中。
fetchsize JDBC fetch size,決定每次獲取多少行數(shù)據。默認為 1000.
isolationLevel 可選有NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE,默認為READ_UNCOMMITTED.
truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods // Loading data from a JDBC source DatasetjdbcDF = spark.read() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load(); Properties connectionProperties = new Properties(); connectionProperties.put("user", "username"); connectionProperties.put("password", "password"); Dataset
jdbcDF2 = spark.read() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties); // Saving data to a JDBC source jdbcDF.write() .format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .save(); jdbcDF2.write() .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
疑難解答
JDBC driver class必須在所有client session或者executor上,對java的原生classloader可見。這是因為Java的DriverManager在打開一個連接之前,會做安全檢查,并忽略所有對原聲classloader不可見的driver。最簡單的一種方法,就是在所有worker節(jié)點上修改compute_classpath.sh,并包含你所需的driver jar包。一些數(shù)據庫,如H2,會把所有的名字轉大寫。對于這些數(shù)據庫,在Spark SQL中必須也使用大寫。
內存緩存
Spark SQL可以通過調用SQLContext.cacheTable(“tableName”)或者DataFrame.cache()把tables以列存儲格式緩存到內存中。隨后,Spark SQL將會掃描必要的列,并自動調整壓縮比例,以減少內存占用和GC壓力。你也可以用SQLContext.uncacheTable(“tableName”)來刪除內存中的table。
你還可以使用SQLContext.setConf 或在SQL語句中運行SET key=value命令,來配置內存中的緩存。
spark.sql.inMemoryColumnarStorage.compressed true 如果設置為true,Spark SQL將會根據數(shù)據統(tǒng)計信息,自動為每一列選擇多帶帶的壓縮編碼方式。
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列式緩存批量的大小。增大批量大小可以提高內存利用率和壓縮率,但同時也會帶來OOM(Out Of Memory)的風險。
其他配置選項
以下選項同樣也可以用來給查詢任務調性能。不過這些選項在未來可能被放棄,因為spark將支持越來越多的自動優(yōu)化。
spark.sql.files.maxPartitionBytes 134217728 (128 MB) The maximum number of bytes to pack into a single partition when reading files.
spark.sql.files.openCostInBytes 4194304 (4 MB) The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first).
spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置join操作時,能夠作為廣播變量的最大table的大小。設置為-1,表示禁用廣播。注意,目前的元數(shù)據統(tǒng)計僅支持Hive metastore中的表,并且需要運行這個命令:ANALYSE TABLE
spark.sql.shuffle.partitions 200 配置數(shù)據混洗(shuffle)時(join或者聚合操作),使用的分區(qū)數(shù)。
分布式SQL引擎Spark SQL可以作為JDBC/ODBC或者命令行工具的分布式查詢引擎。在這種模式下,終端用戶或應用程序,無需寫任何代碼,就可以直接在Spark SQL中運行SQL查詢。
略。
Spark SQL CLI是一個很方便的工具,它可以用local mode運行hive metastore service,并且在命令行中執(zhí)行輸入的查詢。注意Spark SQL CLI目前還不支持和Thrift JDBC server通信。
用如下命令,在spark目錄下啟動一個Spark SQL CLI
./bin/spark-sql
Hive配置在conf目錄下hive-site.xml,core-site.xml,hdfs-site.xml中設置。你可以用這個命令查看完整的選項列表:./bin/spark-sql –help
聚合內建的DataFrame函數(shù)提供了如count(), countDistinct(), avg(), max(), min()等常用的聚合操作,用戶也可以自定義一些聚合函數(shù)。
Untyped User-Defined Aggregate Functions
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public static class MyAverage extends UserDefinedAggregateFunction { private StructType inputSchema; private StructType bufferSchema; public MyAverage() { ListinputFields = new ArrayList<>(); inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true)); inputSchema = DataTypes.createStructType(inputFields); List bufferFields = new ArrayList<>(); bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true)); bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true)); bufferSchema = DataTypes.createStructType(bufferFields); } // Data types of input arguments of this aggregate function public StructType inputSchema() { return inputSchema; } // Data types of values in the aggregation buffer public StructType bufferSchema() { return bufferSchema; } // The data type of the returned value public DataType dataType() { return DataTypes.DoubleType; } // Whether this function always returns the same output on the identical input public boolean deterministic() { return true; } // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. public void initialize(MutableAggregationBuffer buffer) { buffer.update(0, 0L); buffer.update(1, 0L); } // Updates the given aggregation buffer `buffer` with new input data from `input` public void update(MutableAggregationBuffer buffer, Row input) { if (!input.isNullAt(0)) { long updatedSum = buffer.getLong(0) + input.getLong(0); long updatedCount = buffer.getLong(1) + 1; buffer.update(0, updatedSum); buffer.update(1, updatedCount); } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` public void merge(MutableAggregationBuffer buffer1, Row buffer2) { long mergedSum = buffer1.getLong(0) + buffer2.getLong(0); long mergedCount = buffer1.getLong(1) + buffer2.getLong(1); buffer1.update(0, mergedSum); buffer1.update(1, mergedCount); } // Calculates the final result public Double evaluate(Row buffer) { return ((double) buffer.getLong(0)) / buffer.getLong(1); } } // Register the function to access it spark.udf().register("myAverage", new MyAverage()); Dataset df = spark.read().json("examples/src/main/resources/employees.json"); df.createOrReplaceTempView("employees"); df.show(); // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ Dataset
result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees"); result.show(); // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
Type-Safe User-Defined Aggregate Functions
import java.io.Serializable; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.TypedColumn; import org.apache.spark.sql.expressions.Aggregator; public static class Employee implements Serializable { private String name; private long salary; // Constructors, getters, setters... } public static class Average implements Serializable { private long sum; private long count; // Constructors, getters, setters... } public static class MyAverage extends Aggregator參考:{ // A zero value for this aggregation. Should satisfy the property that any b + zero = b public Average zero() { return new Average(0L, 0L); } // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object public Average reduce(Average buffer, Employee employee) { long newSum = buffer.getSum() + employee.getSalary(); long newCount = buffer.getCount() + 1; buffer.setSum(newSum); buffer.setCount(newCount); return buffer; } // Merge two intermediate values public Average merge(Average b1, Average b2) { long mergedSum = b1.getSum() + b2.getSum(); long mergedCount = b1.getCount() + b2.getCount(); b1.setSum(mergedSum); b1.setCount(mergedCount); return b1; } // Transform the output of the reduction public Double finish(Average reduction) { return ((double) reduction.getSum()) / reduction.getCount(); } // Specifies the Encoder for the intermediate value type public Encoder bufferEncoder() { return Encoders.bean(Average.class); } // Specifies the Encoder for the final output value type public Encoder outputEncoder() { return Encoders.DOUBLE(); } } Encoder employeeEncoder = Encoders.bean(Employee.class); String path = "examples/src/main/resources/employees.json"; Dataset ds = spark.read().json(path).as(employeeEncoder); ds.show(); // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ MyAverage myAverage = new MyAverage(); // Convert the function to a `TypedColumn` and give it a name TypedColumn averageSalary = myAverage.toColumn().name("average_salary"); Dataset result = ds.select(averageSalary); result.show(); // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
http://spark.apache.org/docs/...
http://ifeve.com/apache-spark/
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/67290.html
摘要:原文鏈接簡介寫在前面本系列是綜合了自己在學習過程中的理解記錄對參考文章中的一些理解個人實踐過程中的一些心得而來。其次,本系列是基于目前最新的系列開始的,目前的更新速度很快,記錄一下版本好還是必要的。 原文鏈接:『 Spark 』1. spark 簡介 寫在前面 本系列是綜合了自己在學習spark過程中的理解記錄 + 對參考文章中的一些理解 + 個人實踐spark過程中的一些心得而來。寫...
閱讀 3758·2021-08-11 11:16
閱讀 1626·2019-08-30 15:44
閱讀 1998·2019-08-29 18:45
閱讀 2275·2019-08-26 18:18
閱讀 1005·2019-08-26 13:37
閱讀 1571·2019-08-26 11:43
閱讀 2120·2019-08-26 11:34
閱讀 379·2019-08-26 10:59