spark之路第十课——spark SQL编程

spark SQL支持像SQL,hiveQL或scala那样的关系型查询语句,sparkSQL中核心的组件是SchemaRDD,SchemaRDD由row对象和描述每行中每列的数据类型的schema组成。SchemaRDD和关系型数据库中的表类似,可以从已存在的RDD,parquet文件,json数据集或运行HiveQL创建而来。
sparkSQL中最重要的一个类是SQLContext或它的继承类,创建SQLContext如下:

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD

除了基本的SQLContext外,还可以创建HiveContext,它提供了SQLContext所没有的超函数,其他的特征包括使用更复杂的HiveQL解析器进行解析查询语句、访问HiveUDF、从hive表中读取数据。当使用HiveContext,不必安装hive,并且任何SQLContext可用的数据源都可用于HiveContext。
指定的用于解析查询语句的SQL变体可通过spark.sql.dialect选项进行设置,该参数可通过SQLContext的setConf方法或在SQL中使用SET key=value进行设置。对于SQLContext来说,唯一可用的dialect是sql,它使用了SparkSQL提供的一个简单的SQL解析器,对于HiveContext来说,默认的dialect是hiveql,sql也是可用的,但HiveQL解析器会复杂的多。

数据源

sparkSQL支持多种数据源,SchemaRDD可被当作普通的RDD操作并能被注册为一个临时表,注册SchemaRDD成为一个表允许你在它的数据上运行SQL查询。本节描述了多种将数据导入SchemaRDD的方法。

RDD

sparkSQL支持两种不同的方式来将已存在的RDD转换为SchemaRDD,第一种方式使用反射来推断RDD的schema,这种方式会有很多简洁的代码并且能在你书写spark应用程序时已经知道schema的情况下很好的工作。

使用反射推断schema

sparkSQL中的scala接口支持自动将包含case类的RDD转换为SchemaRDD,其中case类定义了表的schema。case类中的属性会被用于表中的列名,case类还可以被嵌套或包含复杂类型如Sequence或Array。

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

编程方式指定的schema

当case类不能被提前定义(如数据被编码成字符串或文本数据集将被解析),这时可以以编程方式来创建SchemaRDD:
1.从原始的RDD创建行的RDD
2.创建匹配第一步创建的RDD中的行结构的StructType
3.使用SQLContext的applySchema方法将schema应用到行RDD中

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Spark SQL data types and Row.
import org.apache.spark.sql.
// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleSchemaRDD = sqlContext.applySchema(rowRDD, schema)
// Register the SchemaRDD as a table.
peopleSchemaRDD.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
results.map(t => "Name: " + t(0)).collect().foreach(println)

parquet文件

parquet是柱状格式数据,它可以被其他很多的数据处理系统支持,sparkSQL支持读写parquet文件以自动保护原始数据的schema。

编程方式导入数据

例子如下:

// sqlContext from the previous example is used in this example.
// createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD.
import sqlContext.createSchemaRDD
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")
// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a SchemaRDD.
val parquetFile = sqlContext.parquetFile("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

配置

parquet的配置可通过SQLContext的setConf方法或通过运行SET key=value方式完成。

属性名 默认值 注释
spark.sql.parquet.binaryAsString false 一些其他的支持parquet的系统如impala或旧版本的spark,在写出parquet schema的时候不会区分二进制数据和字符串。该配置选项告诉sparkSQL将字符串解析为二进制数据以使兼容这些系统
spark.sql.parquet.cacheMetadata false 打开缓存parquet schema元数据会提升静态数据的查询速度
spark.sql.parquet.compression.codec snappy 设置写parquet文件的压缩方式,包括:uncompressed,snappy,gzip,lzo

json数据集

sparkSQL可以通过json数据集推断出schema并将该schema转换为SchemaRDD,该转换可以通过SQLContext的以下两个方法实现:

  • jsonFile-从json文件所在目录中导入数据,文件的每行必须是一个json对象
  • jsonRdd-从已存在的RDD导入数据,RDD中的每个元素是包含json对象的字符串

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // A JSON dataset is pointed to by path.
    // The path can be either a single text file or a directory storing text files.
    val path = "examples/src/main/resources/people.json"
    // Create a SchemaRDD from the file(s) pointed to by path
    val people = sqlContext.jsonFile(path)
    // The inferred schema can be visualized using the printSchema() method.
    people.printSchema()
    // root
    //  |-- age: IntegerType
    //  |-- name: StringType
    // Register this SchemaRDD as a table.
    people.registerTempTable("people")
    // SQL statements can be run by using the sql methods provided by sqlContext.
    val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    // Alternatively, a SchemaRDD can be created for a JSON dataset represented by
    // an RDD[String] storing one JSON object per string.
    val anotherPeopleRDD = sc.parallelize(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

    hive表

    sparkSQL同时也支持读写存储在hive中数据,然而,由于hive有很多的依赖,所以在使用hive前需要运行sbt/sbt -Phive assembly/assembly来编译一个包含hive的jar包。这个jar包也需要放到所有的worker节点上。
    配置hive的一些参数需要放在conf目录的hive-site.xml文件中。
    当需要使用hive则必须构造一个HiveContext,它继承于SQLContext,当用户没有部署过hive也可以创建HiveContext,当没有配置hive-site.xml文件,那么context会自动在当前目录中创建metastore_db和warehouse。

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
    sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
    // Queries are expressed in HiveQL
    sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

    性能优化

    对于很多情况来说需要使用将数据缓存在内存中或使用一些试验性选项来优化的方式来提高性能。

    将数据缓存在内存

    sparkSQL可以通过cacheTable("tableName")方法将表缓存在内存中,这样sparkSQL将只会扫描需要的列并将自动优化压缩以最小化内存使用和GC压力。使用uncacheTable("tableName")将表从内存移除。如果使用cache,那么表将不会使用柱状格式缓存在内存中,所以推荐使用cacheTable方法。
    相关的一些参数如下:

    属性名 默认值 注释
    spark.sql.inMemoryColumnarStorage.compressed false 当为true时,sparkSQL会基于数据的统计情况自动选择一种压缩方式来压缩每列数据
    spark.sql.inMemoryColumnarStorage.batchSize 1000 控制缓存时的大小,较大的值可以提高内存使用率和压缩效率,但会出现OOM

    其他的配置选项

    下面的一些参数也可以用来优化查询性能:

    属性名 默认值 注释
    spark.sql.autoBroadcastJoinThreshold 10000 当处理join查询时广播到每个worker的表的最大字节数,当设置为-1广播功能将失效
    spark.sql.codegen false 当为true时,那么会在运行时动态生成指定查询的表达式,对于那些拥有复杂表达式的查询来说,该选项会导致明显的速度提升,然而对于简单的查询该选项会减慢查询速度
    spark.sql.shuffle.partitions 200 配置在处理join或aggregation时shuffle数据的partition数量

    其他sql接口

    运行thrift jdbc server

    运行如下命令即可启动jdbc server:

    ./sbin/start-thriftserver.sh

    该命令接受所有bin/spark-submit的命令行参数,另外还可通过--hiveconf参数指定hive的属性文件,默认该服务会监听localhost:10000,可通过以下两种方式进行修改:

    export HIVE_SERVER2_THRIFT_PORT=<listening-port>
    export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
    ./sbin/start-thriftserver.sh --master <master-uri> ...

    ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=<listening-port> --hiveconf hive.server2.thrift.bind.host=<listening-host> --master <master-uri> ...

    现在就可以使用下面命令进行测试了:

    ./bin/beeline

    然后在beeline命令行模式下运行!connect jdbc:hive2://localhost:10000,beeline将会提示你输入用户名和密码,具体可查看beeline文档

    运行sparkSQL CLI

    sparkSQL CLI可以在本地模式运行hive metastore并执行通过命令行输入的查询语句,sparkSQL CLI不能与thrift jdbc服务进行通信。下面代码可以启动sparkSQL CLI:

    ./bin/spark-sql

    与其他系统兼容

    shark用户迁移指南

    调度

    通过SET spark.sql.thriftserver.scheduler.pool=accounting;来设置jdbc客户端会话的公平调度池。

    reducer数量

    在shark中,默认的reducer数量为1并且通过mapred.reduce.tasks属性进行设置,sparkSQL已经废弃了该属性,以spark.sql.shuffle.partitions(默认值为200)代替,通过以下代码进行设置:

    SET spark.sql.shuffle.partitions=10;
    SELECT page, count(*) c
    FROM logs_last_month_cached
    GROUP BY page ORDER BY c DESC LIMIT 10;

    还可以将该属性放在hive-site.xml文件中覆盖默认值。到目前,mapred.reduce.tasks属性还可以使用,会被自动转换为spark.sql.shuffle.partitions。

    缓存

    spark.cache表属性也不再可用了,任何以_cached结尾的表名也不会被缓存了,现在通过CACHE TABLE和UNCACHE TABLE两个语句来控制表缓存:

    CACHE TABLE logs_last_month;
    UNCACHE TABLE logs_last_month;

    注意:CACHE TABLE tbl是懒加载的,类似RDD中的.cache方法,该命令只是将tbl标记为当被计算的时候确保partitions被缓存,只有等tbl真正被执行的时候才会缓存。如果希望立即被缓存,使用如下方式:

    CACHE TABLE logs_last_month;
    SELECT COUNT(1) FROM logs_last_month;

    与hive兼容

    sparkSQL可以与hive metastore,serdes和UDF兼容。sparkSQL thrift jdbc服务能够与当前已经安装的hive进行兼容,不需要修改已存在的hive metastore或者改变数据存放目录。

    支持的hive特性

    sparkSQL支持很多hive特性,如:

  • hive 查询语句:select,group by,order by,cluster by,sort by
  • 所有hive操作:关系型操作(=, ⇔, ==, <>, <, >, >=, <=等),算术操作(+, -, *, /, %等),裸机操作(AND, &&, OR, ||等),复杂类型构造,数学函数(sign, ln, cos等),字符串函数(instr, length, printf等)
  • 用户定义函数(UDF)
  • 用户定义聚合函数(UDAF)
  • 用户定义序列化格式(serdes)
  • join:join,{left|right|full} outer join,left semi join,cross join
  • 联合查询
  • 子查询:select col from(select a+b as col from t1)t2
  • 取样操作
  • 解释操作
  • 表分割
  • 所有的hive DDL函数:create table,create table as select,alter table
  • 大部分的hive数据类型:TINYINT,SMALLINT,INT,BIGINT,BOOLEAN,FLOAT,DOUBLE,STRING,BINARY,TIMESTAMP,ARRAY<>,MAP<>,STRUCT<>

    不支持的hive功能

    主要的hive特性

  • sparkSQL目前不支持使用动态分片向表插入数据
  • sparkSQL不支持带有桶的表,桶是hive表分片的hash分片方式

    深奥的hive特性

  • hive中带有分片的表使用不同的输入格式,在sparkSQL中,所有的表分片使用相同的输入格式
  • hive支持在join操作使用non-equi的条件(如key<10),在sparkSQL中如果使用这种方式会输出错误的结果
  • UNION和DATE类型
  • 唯一join
  • 单查询多插入
  • sparkSQL不支持piggyback浏览来收集列统计,只支持操作hive metastore中的sizeInBytes字段

    hive输入输出格式

  • sparkSQL只支持TextOutputFormat
  • hadoop归档文件

    hive优化

    很多的hive优化目前都不能用户sparkSQL,其中一些(如索引)对于sparkSQL不是很重要的,因为sparkSQL是内存计算模型,其他的一些会在未来的sparkSQL版本中得到支持:

  • 块级别的bitmap索引和虚拟字段(用来建立索引)
  • 自动将join转换为map join:在大表与很多小表进行join时,hive会自动将join转换为map join,下个版本的sparkSQL会得到支持
  • 自动决定join和groupby时reducer的数量,目前,sparkSQL需要使用SET spark.sql.shuffle.partitions=[num_tasks];来控制并行度
  • 只查询元数据:hive在查询时可以只查询元数据,sparkSQL需要部署任务来计算结果
  • sparkSQL不支持hive中的skew data flag
  • sparkSQL不支持hive的STREAMTABLE
  • 合并多个小文件作为查询结果:如果查询结果包括很多的小文件,hive可以合并这些小文件为大文件避免HDFS元数据容量溢出。sparkSQL暂时不支持该特性

    书写语言集成的关系型查询

    语言集成查询是高级特性并只在scala中支持。如:

    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // Importing the SQL context gives access to all the public SQL functions and implicit conversions.
    import sqlContext._
    val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
    // The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
    val teenagers = people.where('age >= 10).where('age <= 19).select('name)
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

    使用'符号会隐式将相关语句转换为sql表达式,相关详细信息可查看scalaDoc

    sparkSQL数据类型

  • 数字类型:Byte,Short,Integer,Long,Float,Double,Decimal
  • 字符串类型:String
  • 二进制类型:Binary
  • 布尔类型:Boolean
  • 日期类型:Timestamp
  • 复杂类型:Array(elementType,containsNull),表示一系列的elementType类型的数组,containsNull表示数组中的值是否允许为null;Map(keyType,valueType,valueContainsNull):表示一系列的key-value值对;Struct(fields):表示由一系列的StructFields组成的结构体,StructFields的定义为StructField(name,dataType,nullable) 所有的sparkSQL数据类型都在org.apache.spark.sql包中,通过import org.apache.spark.sql._来导入所有的数据类型。

    数据类型 scala中值类型 访问api或创建数据类型
    ByteType Byte ByteType
    ShortType Short ShortType
    IntegerType Int IntegerType
    LongType Long LongType
    FloatType Float FloatType
    Double Double DoubleType
    DecimalType scala.math.sql.BigDecimal DecimalType
    StringType String StringType
    BinaryType Array[Byte] BinaryType
    BooleanType Boolean BooleanType
    TimestampType java.sql.Timestamp TimestampType
    ArrayType scala.collection.Seq ArrayType(elementType,[containsNull]),containsNull默认值为false
    MapType scala.collection.Map MapType(keyType,valueType,[valueContainsNull]),valueContainsNull默认为true
    StructType org.apache.spark.sql.Row StructType(fields),fields是StructFields的Seq,两个名字相同的fields是不允许的
    StructField 字段数据类型在scala的值类型 StructField(name,dataType,nullable)


  • Previous     Next
    uohzoaix /
    Published under (CC) BY-NC-SA in categories spark  tagged with