spark SQL支持像SQL,hiveQL或scala那样的关系型查询语句,sparkSQL中核心的组件是SchemaRDD,SchemaRDD由row对象和描述每行中每列的数据类型的schema组成。SchemaRDD和关系型数据库中的表类似,可以从已存在的RDD,parquet文件,json数据集或运行HiveQL创建而来。
sparkSQL中最重要的一个类是SQLContext或它的继承类,创建SQLContext如下:
除了基本的SQLContext外,还可以创建HiveContext,它提供了SQLContext所没有的超函数,其他的特征包括使用更复杂的HiveQL解析器进行解析查询语句、访问HiveUDF、从hive表中读取数据。当使用HiveContext,不必安装hive,并且任何SQLContext可用的数据源都可用于HiveContext。
指定的用于解析查询语句的SQL变体可通过spark.sql.dialect选项进行设置,该参数可通过SQLContext的setConf方法或在SQL中使用SET key=value进行设置。对于SQLContext来说,唯一可用的dialect是sql,它使用了SparkSQL提供的一个简单的SQL解析器,对于HiveContext来说,默认的dialect是hiveql,sql也是可用的,但HiveQL解析器会复杂的多。
数据源
sparkSQL支持多种数据源,SchemaRDD可被当作普通的RDD操作并能被注册为一个临时表,注册SchemaRDD成为一个表允许你在它的数据上运行SQL查询。本节描述了多种将数据导入SchemaRDD的方法。
RDD
sparkSQL支持两种不同的方式来将已存在的RDD转换为SchemaRDD,第一种方式使用反射来推断RDD的schema,这种方式会有很多简洁的代码并且能在你书写spark应用程序时已经知道schema的情况下很好的工作。
使用反射推断schema
sparkSQL中的scala接口支持自动将包含case类的RDD转换为SchemaRDD,其中case类定义了表的schema。case类中的属性会被用于表中的列名,case类还可以被嵌套或包含复杂类型如Sequence或Array。
编程方式指定的schema
当case类不能被提前定义(如数据被编码成字符串或文本数据集将被解析),这时可以以编程方式来创建SchemaRDD:
1.从原始的RDD创建行的RDD
2.创建匹配第一步创建的RDD中的行结构的StructType
3.使用SQLContext的applySchema方法将schema应用到行RDD中
parquet文件
parquet是柱状格式数据,它可以被其他很多的数据处理系统支持,sparkSQL支持读写parquet文件以自动保护原始数据的schema。
编程方式导入数据
例子如下:
配置
parquet的配置可通过SQLContext的setConf方法或通过运行SET key=value方式完成。
spark.sql.parquet.binaryAsString |
false |
一些其他的支持parquet的系统如impala或旧版本的spark,在写出parquet schema的时候不会区分二进制数据和字符串。该配置选项告诉sparkSQL将字符串解析为二进制数据以使兼容这些系统 |
spark.sql.parquet.cacheMetadata |
false |
打开缓存parquet schema元数据会提升静态数据的查询速度 |
spark.sql.parquet.compression.codec |
snappy |
设置写parquet文件的压缩方式,包括:uncompressed,snappy,gzip,lzo |
json数据集
sparkSQL可以通过json数据集推断出schema并将该schema转换为SchemaRDD,该转换可以通过SQLContext的以下两个方法实现:
jsonFile-从json文件所在目录中导入数据,文件的每行必须是一个json对象
jsonRdd-从已存在的RDD导入数据,RDD中的每个元素是包含json对象的字符串
hive表
sparkSQL同时也支持读写存储在hive中数据,然而,由于hive有很多的依赖,所以在使用hive前需要运行sbt/sbt -Phive assembly/assembly来编译一个包含hive的jar包。这个jar包也需要放到所有的worker节点上。
配置hive的一些参数需要放在conf目录的hive-site.xml文件中。
当需要使用hive则必须构造一个HiveContext,它继承于SQLContext,当用户没有部署过hive也可以创建HiveContext,当没有配置hive-site.xml文件,那么context会自动在当前目录中创建metastore_db和warehouse。
性能优化
对于很多情况来说需要使用将数据缓存在内存中或使用一些试验性选项来优化的方式来提高性能。
将数据缓存在内存
sparkSQL可以通过cacheTable("tableName")方法将表缓存在内存中,这样sparkSQL将只会扫描需要的列并将自动优化压缩以最小化内存使用和GC压力。使用uncacheTable("tableName")将表从内存移除。如果使用cache,那么表将不会使用柱状格式缓存在内存中,所以推荐使用cacheTable方法。
相关的一些参数如下:
spark.sql.inMemoryColumnarStorage.compressed |
false |
当为true时,sparkSQL会基于数据的统计情况自动选择一种压缩方式来压缩每列数据 |
spark.sql.inMemoryColumnarStorage.batchSize |
1000 |
控制缓存时的大小,较大的值可以提高内存使用率和压缩效率,但会出现OOM |
其他的配置选项
下面的一些参数也可以用来优化查询性能:
spark.sql.autoBroadcastJoinThreshold |
10000 |
当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效 |
spark.sql.codegen |
false |
当为true时,那么会在运行时动态生成指定查询的表达式,对于那些拥有复杂表达式的查询来说,该选项会导致明显的速度提升,然而对于简单的查询该选项会减慢查询速度 |
spark.sql.shuffle.partitions |
200 |
配置在处理join或aggregation时shuffle数据的partition数量 |
其他sql接口
运行thrift jdbc server
运行如下命令即可启动jdbc server:
该命令接受所有bin/spark-submit的命令行参数,另外还可通过--hiveconf参数指定hive的属性文件,默认该服务会监听localhost:10000,可通过以下两种方式进行修改:
现在就可以使用下面命令进行测试了:
然后在beeline命令行模式下运行!connect jdbc:hive2://localhost:10000,beeline将会提示你输入用户名和密码,具体可查看beeline文档。
运行sparkSQL CLI
sparkSQL CLI可以在本地模式运行hive metastore并执行通过命令行输入的查询语句,sparkSQL CLI不能与thrift jdbc服务进行通信。下面代码可以启动sparkSQL CLI:
与其他系统兼容
shark用户迁移指南
调度
通过SET spark.sql.thriftserver.scheduler.pool=accounting;来设置jdbc客户端会话的公平调度池。
reducer数量
在shark中,默认的reducer数量为1并且通过mapred.reduce.tasks属性进行设置,sparkSQL已经废弃了该属性,以spark.sql.shuffle.partitions(默认值为200)代替,通过以下代码进行设置:
还可以将该属性放在hive-site.xml文件中覆盖默认值。到目前,mapred.reduce.tasks属性还可以使用,会被自动转换为spark.sql.shuffle.partitions。
缓存
spark.cache表属性也不再可用了,任何以_cached结尾的表名也不会被缓存了,现在通过CACHE TABLE和UNCACHE TABLE两个语句来控制表缓存:
注意:CACHE TABLE tbl是懒加载的,类似RDD中的.cache方法,该命令只是将tbl标记为当被计算的时候确保partitions被缓存,只有等tbl真正被执行的时候才会缓存。如果希望立即被缓存,使用如下方式:
与hive兼容
sparkSQL可以与hive metastore,serdes和UDF兼容。sparkSQL thrift jdbc服务能够与当前已经安装的hive进行兼容,不需要修改已存在的hive metastore或者改变数据存放目录。
支持的hive特性
sparkSQL支持很多hive特性,如:
hive 查询语句:select,group by,order by,cluster by,sort by
所有hive操作:关系型操作(=, ⇔, ==, <>, <, >, >=, <=等),算术操作(+, -, *, /, %等),裸机操作(AND, &&, OR, ||等),复杂类型构造,数学函数(sign, ln, cos等),字符串函数(instr, length, printf等)
用户定义函数(UDF)
用户定义聚合函数(UDAF)
用户定义序列化格式(serdes)
join:join,{left|right|full} outer join,left semi join,cross join
联合查询
子查询:select col from(select a+b as col from t1)t2
取样操作
解释操作
表分割
所有的hive DDL函数:create table,create table as select,alter table
大部分的hive数据类型:TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTAMP,ARRAY<>,MAP<>,STRUCT<>
不支持的hive功能
主要的hive特性
sparkSQL目前不支持使用动态分片向表插入数据
sparkSQL不支持带有桶的表,桶是hive表分片的hash分片方式
深奥的hive特性
hive中带有分片的表使用不同的输入格式,在sparkSQL中,所有的表分片使用相同的输入格式
hive支持在join操作使用non-equi的条件(如key<10),在sparkSQL中如果使用这种方式会输出错误的结果
UNION和DATE类型
唯一join
单查询多插入
sparkSQL不支持piggyback浏览来收集列统计,只支持操作hive metastore中的sizeInBytes字段
hive输入输出格式
sparkSQL只支持TextOutputFormat
hadoop归档文件
hive优化
很多的hive优化目前都不能用户sparkSQL,其中一些(如索引)对于sparkSQL不是很重要的,因为sparkSQL是内存计算模型,其他的一些会在未来的sparkSQL版本中得到支持:
块级别的bitmap索引和虚拟字段(用来建立索引)
自动将join转换为map join:在大表与很多小表进行join时,hive会自动将join转换为map join,下个版本的sparkSQL会得到支持
自动决定join和groupby时reducer的数量,目前,sparkSQL需要使用SET spark.sql.shuffle.partitions=[num_tasks];来控制并行度
只查询元数据:hive在查询时可以只查询元数据,sparkSQL需要部署任务来计算结果
sparkSQL不支持hive中的skew data flag
sparkSQL不支持hive的STREAMTABLE
合并多个小文件作为查询结果:如果查询结果包括很多的小文件,hive可以合并这些小文件为大文件避免HDFS元数据容量溢出。sparkSQL暂时不支持该特性
书写语言集成的关系型查询
语言集成查询是高级特性并只在scala中支持。如:
使用'符号会隐式将相关语句转换为sql表达式,相关详细信息可查看scalaDoc。
sparkSQL数据类型
数字类型:Byte,Short,Integer,Long,Float,Double,Decimal
字符串类型:String
二进制类型:Binary
布尔类型:Boolean
日期类型:Timestamp
复杂类型:Array(elementType,containsNull),表示一系列的elementType类型的数组,containsNull表示数组中的值是否允许为null;Map(keyType,valueType,valueContainsNull):表示一系列的key-value值对;Struct(fields):表示由一系列的StructFields组成的结构体,StructFields的定义为StructField(name,dataType,nullable)
所有的sparkSQL数据类型都在org.apache.spark.sql包中,通过import org.apache.spark.sql._来导入所有的数据类型。
ByteType |
Byte |
ByteType |
ShortType |
Short |
ShortType |
IntegerType |
Int |
IntegerType |
LongType |
Long |
LongType |
FloatType |
Float |
FloatType |
Double |
Double |
DoubleType |
DecimalType |
scala.math.sql.BigDecimal |
DecimalType |
StringType |
String |
StringType |
BinaryType |
Array[Byte] |
BinaryType |
BooleanType |
Boolean |
BooleanType |
TimestampType |
java.sql.Timestamp |
TimestampType |
ArrayType |
scala.collection.Seq |
ArrayType(elementType,[containsNull]),containsNull默认值为false |
MapType |
scala.collection.Map |
MapType(keyType,valueType,[valueContainsNull]),valueContainsNull默认为true |
StructType |
org.apache.spark.sql.Row |
StructType(fields),fields是StructFields的Seq,两个名字相同的fields是不允许的 |
StructField |
字段数据类型在scala的值类型 |
StructField(name,dataType,nullable) |