spark SQL支持像SQL,hiveQL或scala那样的关系型查询语句,sparkSQL中核心的组件是SchemaRDD,SchemaRDD由row对象和描述每行中每列的数据类型的schema组成。SchemaRDD和关系型数据库中的表类似,可以从已存在的RDD,parquet文件,json数据集或运行HiveQL创建而来。
sparkSQL中最重要的一个类是SQLContext或它的继承类,创建SQLContext如下:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
除了基本的SQLContext外,还可以创建HiveContext,它提供了SQLContext所没有的超函数,其他的特征包括使用更复杂的HiveQL解析器进行解析查询语句、访问HiveUDF、从hive表中读取数据。当使用HiveContext,不必安装hive,并且任何SQLContext可用的数据源都可用于HiveContext。
指定的用于解析查询语句的SQL变体可通过spark.sql.dialect选项进行设置,该参数可通过SQLContext的setConf方法或在SQL中使用SET key=value进行设置。对于SQLContext来说,唯一可用的dialect是sql,它使用了SparkSQL提供的一个简单的SQL解析器,对于HiveContext来说,默认的dialect是hiveql,sql也是可用的,但HiveQL解析器会复杂的多。
数据源
sparkSQL支持多种数据源,SchemaRDD可被当作普通的RDD操作并能被注册为一个临时表,注册SchemaRDD成为一个表允许你在它的数据上运行SQL查询。本节描述了多种将数据导入SchemaRDD的方法。
RDD
sparkSQL支持两种不同的方式来将已存在的RDD转换为SchemaRDD,第一种方式使用反射来推断RDD的schema,这种方式会有很多简洁的代码并且能在你书写spark应用程序时已经知道schema的情况下很好的工作。
使用反射推断schema
sparkSQL中的scala接口支持自动将包含case类的RDD转换为SchemaRDD,其中case类定义了表的schema。case类中的属性会被用于表中的列名,case类还可以被嵌套或包含复杂类型如Sequence或Array。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
编程方式指定的schema
当case类不能被提前定义(如数据被编码成字符串或文本数据集将被解析),这时可以以编程方式来创建SchemaRDD:
1.从原始的RDD创建行的RDD
2.创建匹配第一步创建的RDD中的行结构的StructType
3.使用SQLContext的applySchema方法将schema应用到行RDD中
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Spark SQL data types and Row.
import org.apache.spark.sql.
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)
parquet文件
parquet是柱状格式数据,它可以被其他很多的数据处理系统支持,sparkSQL支持读写parquet文件以自动保护原始数据的schema。
编程方式导入数据
例子如下:
// sqlContext from the previous example is used in this example.
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a SchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
配置
parquet的配置可通过SQLContext的setConf方法或通过运行SET key=value方式完成。
属性名 | 默认值 | 注释 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 一些其他的支持parquet的系统如impala或旧版本的spark,在写出parquet schema的时候不会区分二进制数据和字符串。该配置选项告诉sparkSQL将字符串解析为二进制数据以使兼容这些系统 |
spark.sql.parquet.cacheMetadata | false | 打开缓存parquet schema元数据会提升静态数据的查询速度 |
spark.sql.parquet.compression.codec | snappy | 设置写parquet文件的压缩方式,包括:uncompressed,snappy,gzip,lzo |
json数据集
sparkSQL可以通过json数据集推断出schema并将该schema转换为SchemaRDD,该转换可以通过SQLContext的以下两个方法实现:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
// Create a SchemaRDD from the file(s) pointed to by path
val people = sqlContext.jsonFile(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: IntegerType
// |-- name: StringType
// Register this SchemaRDD as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a SchemaRDD can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
hive表
sparkSQL同时也支持读写存储在hive中数据,然而,由于hive有很多的依赖,所以在使用hive前需要运行sbt/sbt -Phive assembly/assembly来编译一个包含hive的jar包。这个jar包也需要放到所有的worker节点上。
配置hive的一些参数需要放在conf目录的hive-site.xml文件中。
当需要使用hive则必须构造一个HiveContext,它继承于SQLContext,当用户没有部署过hive也可以创建HiveContext,当没有配置hive-site.xml文件,那么context会自动在当前目录中创建metastore_db和warehouse。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
性能优化
对于很多情况来说需要使用将数据缓存在内存中或使用一些试验性选项来优化的方式来提高性能。
将数据缓存在内存
sparkSQL可以通过cacheTable("tableName")方法将表缓存在内存中,这样sparkSQL将只会扫描需要的列并将自动优化压缩以最小化内存使用和GC压力。使用uncacheTable("tableName")将表从内存移除。如果使用cache,那么表将不会使用柱状格式缓存在内存中,所以推荐使用cacheTable方法。
相关的一些参数如下:
属性名 | 默认值 | 注释 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | false | 当为true时,sparkSQL会基于数据的统计情况自动选择一种压缩方式来压缩每列数据 |
spark.sql.inMemoryColumnarStorage.batchSize | 1000 | 控制缓存时的大小,较大的值可以提高内存使用率和压缩效率,但会出现OOM |
其他的配置选项
下面的一些参数也可以用来优化查询性能:
属性名 | 默认值 | 注释 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10000 | 当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效 |
spark.sql.codegen | false | 当为true时,那么会在运行时动态生成指定查询的表达式,对于那些拥有复杂表达式的查询来说,该选项会导致明显的速度提升,然而对于简单的查询该选项会减慢查询速度 |
spark.sql.shuffle.partitions | 200 | 配置在处理join或aggregation时shuffle数据的partition数量 |
其他sql接口
运行thrift jdbc server
运行如下命令即可启动jdbc server:
./sbin/start-thriftserver.sh
该命令接受所有bin/spark-submit的命令行参数,另外还可通过--hiveconf参数指定hive的属性文件,默认该服务会监听localhost:10000,可通过以下两种方式进行修改:
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh --master <master-uri> ...
./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=<listening-port> --hiveconf hive.server2.thrift.bind.host=<listening-host> --master <master-uri> ...
现在就可以使用下面命令进行测试了:
./bin/beeline
然后在beeline命令行模式下运行!connect jdbc:hive2://localhost:10000,beeline将会提示你输入用户名和密码,具体可查看beeline文档。
运行sparkSQL CLI
sparkSQL CLI可以在本地模式运行hive metastore并执行通过命令行输入的查询语句,sparkSQL CLI不能与thrift jdbc服务进行通信。下面代码可以启动sparkSQL CLI:
./bin/spark-sql
与其他系统兼容
shark用户迁移指南
调度
通过SET spark.sql.thriftserver.scheduler.pool=accounting;来设置jdbc客户端会话的公平调度池。
reducer数量
在shark中,默认的reducer数量为1并且通过mapred.reduce.tasks属性进行设置,sparkSQL已经废弃了该属性,以spark.sql.shuffle.partitions(默认值为200)代替,通过以下代码进行设置:
SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c
FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;
还可以将该属性放在hive-site.xml文件中覆盖默认值。到目前,mapred.reduce.tasks属性还可以使用,会被自动转换为spark.sql.shuffle.partitions。
缓存
spark.cache表属性也不再可用了,任何以_cached结尾的表名也不会被缓存了,现在通过CACHE TABLE和UNCACHE TABLE两个语句来控制表缓存:
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;
注意:CACHE TABLE tbl是懒加载的,类似RDD中的.cache方法,该命令只是将tbl标记为当被计算的时候确保partitions被缓存,只有等tbl真正被执行的时候才会缓存。如果希望立即被缓存,使用如下方式:
CACHE TABLE logs_last_month;
SELECT COUNT(1) FROM logs_last_month;
与hive兼容
sparkSQL可以与hive metastore,serdes和UDF兼容。sparkSQL thrift jdbc服务能够与当前已经安装的hive进行兼容,不需要修改已存在的hive metastore或者改变数据存放目录。
支持的hive特性
sparkSQL支持很多hive特性,如:
不支持的hive功能
主要的hive特性
深奥的hive特性
hive输入输出格式
hive优化
很多的hive优化目前都不能用户sparkSQL,其中一些(如索引)对于sparkSQL不是很重要的,因为sparkSQL是内存计算模型,其他的一些会在未来的sparkSQL版本中得到支持:
书写语言集成的关系型查询
语言集成查询是高级特性并只在scala中支持。如:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
使用'符号会隐式将相关语句转换为sql表达式,相关详细信息可查看scalaDoc。
sparkSQL数据类型
数据类型 | scala中值类型 | 访问api或创建数据类型 |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
Double | Double | DoubleType |
DecimalType | scala.math.sql.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
ArrayType | scala.collection.Seq | ArrayType(elementType,[containsNull]),containsNull默认值为false |
MapType | scala.collection.Map | MapType(keyType,valueType,[valueContainsNull]),valueContainsNull默认为true |
StructType | org.apache.spark.sql.Row | StructType(fields),fields是StructFields的Seq,两个名字相同的fields是不允许的 |
StructField | 字段数据类型在scala的值类型 | StructField(name,dataType,nullable) |