Spark SQL 2.3.0:深入浅出

猪小花1号2018-09-13 18:25

作者: GitChat


001

Spark SQL 概述

Spark SQL is Apache Spark's module for working with structured data.

集成
Spark Sql 可以使用 SQL 或者所熟悉的 DataFrame API 在 spark 程序中查询结构化的数据。

统一的数据访问
Spark Sql 提供了访问各种数据源的常用方法,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC。

兼容 Hive
Spark SQL 支持 HiveQL 语法以及 Hive SerDes 和 UDF,允许您访问现有的 Hive 仓库。

标准连接
Spark SQL 可以通过 JDBC 或 ODBC 连接外部的BI工具。

Spark SQL 架构

首先,Spark SQL 对外提供了多种访问方式,我们可以通过 Hive Hql、Spark 编程的方式(SQL 或者 DataFram/Dataset API)、Streaming SQL 的方式提交执行程序。


然后,会生成一个未完全解析的逻辑执行计划,再集合内部 schema 信息生生一个逻辑执行计划,最后再经过优化,最终生成一个优化后的逻辑执行计划,而这一切就是 Spark SQL 的核心 —— Catalyst 来完成。

最后,将优化后的逻辑执行计划交由 Spark Engine 来翻译执行我们提交的作业。


002

DataFrame&Dateset

DataFrame 产生背景:

DataFrame 并不是 Spark SQL 提出的,而是早起在 R/Pandas 语言就已经存在,但由于 R/Pandas 只能满足单机上的一些数据处理需求,无法完成一些大数据量的任务,但是 Spark SQL 作为 Spark 的模块,可以借助 Spark 的大数据处理性能,完全胜任这些大数据量的处理任务,另外由于 DataFrame 早期已存在,因此在编写 Spark SQL 程序的时候,无疑降低了不小的门槛。

DataFrame 概述:

以列的形式构成的分布式数据集,按照列赋予不同的名称(相当于加上了 schema 的 RDD)。可以看做是一个经过优化后的一个数据表(table)。

提供了类 SQL 的 API 如:select/filter/aggregation/where 操作结构化的数据。将 R/Pandas 处理小数据的经验复用到分布式的大数据上,应为它的灵感来自 R/Pandas。

DataFrame 基本 API:

  1. Create DataFrame
  2. printSchema
  3. show
  4. select
  5. filter
  6. ...

样例数据(people.json):

{"name":"张三", "age":18, "sex":"man"}
{"name":"李四", "age":28, "sex":"female"}
{"name":"王五",  "sex":"man"}

Spark SQL 编程:

val spark = SparkSession.builder().conf(sparkConf).getOrCreate()// 创建DataFrame 将json文件加载为DataFrameval peopleDF = spark.read.format("json").load("/data/people.json")//  输出Schema信息peopleDF.printSchema()//  展示结果---默认展示20条peopleDF.show()//  查询某一列peopleDF.select(peopleDF("name"))// 过滤出大于20岁的人peopleDF.filter("age > 20").show()

输出结果如下:

//  输出Schema信息
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)//  展示结果---默认展示20条+----+----+------+| age|name|   sex|
+----+----+------+|  18|  张三|   man|
|  28|  李四|female|
|null|  王五|   man|
+----+----+------+//  查询某一列
+----+|name|
+----+|  张三|
|  李四|
|  王五|
+----+// 过滤出大于20岁的人
+---+----+------+|age|name|   sex|
+---+----+------+| 28|  李四|female|
+---+----+------+


003

DataFrame & RDD 互操作

反射的方式:

此种方式需要定义一个 object 类,在 spark 中可以直接使用 case class 来实现:

编程代码:

// 定义object
 case class people(id: Long, name: String, sex: String)

 val info = spark.sparkContext.textFile("/data/info.txt") //    此处一定要导入隐式转换,否则无法使用toDF()方法
 import spark.implicits._
 val infoDF = info.map(_.split(",")).map(line => people(line(0).toLong, line(1), line(2)))
        .toDF()

 infoDF.show()

输出结果:

+---+--------+------+| id|    name|   sex|+---+--------+------+|  1|zhangsan|   man||  2|    lisi|female||  3|  wangwu|   man|+---+--------+------+


编程方式:

  1. 将一个基本的 RDD 转换为一个 Row RDD;
  2. 定义 Schema 信息;
  3. 调用 SparkSession createDataFrame 方法创建 DataFrame;

编程代码:

//    将RDD转换为Row RDDval info = spark.sparkContext.textFile("/data/info.txt")
val infoRDD = info.map(_.split(",")).map(line => Row(line(0).toLong, line(1), line(2)))//    定义StrctTypeval structType = StructType(Array(
      StructField("id", LongType, true),
      StructField("name", StringType, true),
      StructField("sex", StringType, true)
 ))//    调用方法创建DataFrameval infoDF = spark.createDataFrame(infoRDD, structType)//    输出结果infoDF.show()

输出结果:

+---+--------+------+| id|    name|   sex|+---+--------+------+|  1|zhangsan|   man||  2|    lisi|female||  3|  wangwu|   man|+---+--------+------+

反射 & 编程方式对比小结

  1. 如果已知数据结构,可以使用方式一反射的方式将 rdd 转换为 DataFrame,但是此种方式依赖于 case class,spark 中关于样例类的属性是有限制的,当字段过多的时候反射这种方式就无法使用。
  2. 在数据结构未知时,可以使用第二种方式,此种方式编程较第一种方式稍过繁琐,但使用范围更广一些。


今日抄书:

贤者弗拉巴米尔·彭的格言,

“你最热爱的歌曲,其实他们都是在骗你,他们并不是为你唱,只是希望你开心。”


本文来自知乎,经作者 GitChat 授权发布