Spark SQL
Spark SQL
一、Spark SQL架构
-
能够直接访问现存的Hive数据
-
提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理
-
提供更高层级的接口方便处理数据
-
支持多种操作方式:SQL、API编程
- API编程:Spark SQL基于SQL开发了一套SQL语句的算子,名称和标准的SQL语句相似
-
支持Parquet、CSV、JSON、RDBMS、Hive、HBase等多种外部数据源。(掌握多种数据读取方式)
-
Spark SQL核心:是RDD+Schema(算子+表结构),为了更方便我们操作,会将RDD+Schema发给DataFrame
-
数据回灌:用于将处理和清洗后的数据回写到Hive中,以供后续分析和使用。
-
BI Tools:主要用于数据呈现。
-
Spark Application:开发人员使用Spark Application编写数据处理和分析逻辑,这些应用可以用不同的编程语言编写,比如Python、Scala、Java等。
二、Spark SQL运行原理
- Catalyst优化器的运行流程:
- Frontend(前端)
- 输入:用户可以通过SQL查询或DataFrame API来输入数据处理逻辑。
- Unresolved Logical Plan(未解析的逻辑计划):输入的SQL查询或DataFrame转换操作会首先被转换为一个未解析的逻辑计划,这个计划包含了用户请求的所有操作,但其中的表名和列名等可能尚未解析。
- Catalyst Optimizer(Catalyst优化器) Catalyst优化器是Spark SQL的核心组件,它负责将逻辑计划转换为物理执行计划,并进行优化。Catalyst优化器包括以下几个阶段:
- Analysis(分析):将未解析的逻辑计划中的表名和列名解析为具体的元数据,这一步依赖于Catalog(元数据存储)。输出是一个解析后的逻辑计划。
- Logical Optimization(逻辑优化):对解析后的逻辑计划进行各种优化,如投影剪切、过滤下推等。优化后的逻辑计划更加高效。
- Physical Planning(物理计划):将优化后的逻辑计划转换为一个或多个物理执行计划。每个物理计划都代表了一种可能的执行方式。
- Cost Model(成本模型):评估不同物理计划的执行成本,选择代价最低的物理计划作为最终的物理计划。
- Backend(后端)
- Code Generation(代码生成):将选择的物理计划转换为可以在Spark上执行的RDD操作。这一步会生成实际的执行代码。
- RDDs:最终生成的RDD操作被执行,以完成用户请求的数据处理任务。
- 一个SQL查询在Spark SQL中的优化流程
SELECT name FROM( SELECT id, name FROM people ) p WHERE p.id = 1
- Filter下压:将Filter操作推到更靠近数据源的位置,以减少不必要的数据处理。
- 合并Projection:减少不必要的列选择
- IndexLookup return:name:如果存在索引,可以直接通过索引查找并返回name列
三、Spark SQL API
-
SparkContext:Spark应用的主入口,代表了与Spark集群的连接。
-
SQLContext:Spark SQL的编程入口,使用SQLContext可以运行SQL查询、加载数据源和创建DataFrame。
-
HiveContext:SQLContext的一个子集,可以执行HiveQL查询,并且可以访问Hive元数据和UDF。
-
SparkSession:Spark2.0后推荐使用,合并了SQLContext和HiveContext,提供了与Spark所有功能交互的单一入口点。
创建一个SparkSession就包含了一个SparkContext。
-
若同时需要创建SparkContext和SparkSession,必须先创建SparkContext再创建SparkSession。否则,会抛出如下异常,提示重复创建SparkContext:
详细解释
创建SparkSession的代码
val conf: SparkConf = new SparkConf() .setMaster("local[4]") .setAppName("SparkSql") def main(args: Array[String]): Unit = { SparkSession.builder() .config(conf) .getOrCreate() }
优化:减少创建代码,SparkSessionBuilder工具类
package com.ybg import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession // 封装SparkSession的创建方法 class SparkSessionBuilder(master:String,appName:String){ lazy val config:SparkConf = { new SparkConf() .setMaster(master) .setAppName(appName) } lazy val spark:SparkSession = { SparkSession.builder() .config(config) .getOrCreate() } lazy val sc:SparkContext = { spark.sparkContext } def stop(): Unit = { if (null != spark) { spark.stop() } } } object SparkSessionBuilder { def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName) }
四、Spark SQL依赖
pom.xml
8 8 UTF-8 3.1.2 2.12 3.1.3 8.0.33 3.1.2 2.3.5 2.10.0 org.apache.spark spark-core_${spark.scala.version} ${spark.version} org.apache.spark spark-sql_${spark.scala.version} ${spark.version} 若出现如下异常: Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and com.fasterxml.jackson.core jackson-databind 2.10.0 com.mysql mysql-connector-j ${mysql.version}
log4j.properties
log4j.properties应该放在资源包下。
log4j.rootLogger=ERROR, stdout, logfile # 设置可显示的信息等级 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=log/spark_first.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
五、Spark SQL数据集
1、DataSet
- 简介:
- 从Spark 1.6开始引入的新的抽象。
- 是特定领域对象中的强类型集合。
- 可以使用函数式编程或SQL查询进行操作。
- 等于RDD + Schema。
2、DataFrame
- 简介:
- DataFrame是特殊的DataSet:DataFrame=DataSet[Row],行对象的集合,每一行就是一个行对象。
- 类似于传统数据的二维表格。
- 特性:
- Schema:在RDD基础上增加了Schema,描述数据结构信息
- 嵌套数据类型:支持struct,map,array等嵌套数据类型。
- API:提供类似SQL的操作接口。
详细解释
创建DataSet的代码
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // 提供了一组隐式转换,这些转换允许将Scala的本地集合类型(如Seq、Array、List等)转换为Spark的DataSet。 import spark.implicits._ val dsPhone: Dataset[Product] = spark.createDataset(Seq( Product(1, "Huawei Mate60", 5888.0f), Product(2, "IPhone", 5666.0f), Product(3, "OPPO", 1888.0f) )) dsPhone.printSchema() /** * root * |-- id: integer (nullable = false) * |-- name: string (nullable = true) * |-- price: float (nullable = false) */
创建DataFrame的代码
-
读取CSV文件
-
对于CSV文件,在构建DataFrame之前,必须要先创建一个Schema,再根据文件类型分不同情况进行导入。(读取JSON文件或者数据库表都并不需要)
-
注意:必须要import spark.implicits._,导入隐式类,才能够识别一些隐式转换,否则会报错。
-
CSV文件在创建DataFrame时,可以选择尽量模仿Hive中的OpenCSVSerDe的
val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() import spark.implicits._ val schema: StructType = StructType( Seq( StructField("user_id", LongType), StructField("locale", StringType), StructField("birthYear", IntegerType), StructField("gender", StringType), StructField("joinedAt", StringType), StructField("location", StringType), StructField("timezone", StringType) ) ) val frmUsers: DataFrame = spark.read .schema(schema) .option("separator", ",") // 指定文件分割符 .option("header", "true") // 指定CSV文件包含表头 .option("quoteChar", "\"") .option("escapeChar", "\\") .csv("C:\\Users\\lenovo\\Desktop\\users.csv") .repartition(4) .cache()
- 读取JSON文件
val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json") frmUsers2.show()
- 读取数据库表
val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 数据库连接地址 val mysql = new Properties() mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver") mysql.setProperty("user", "root") mysql.setProperty("password", "123456") spark .read .jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,连接属性) .show(100)
六、Spark_SQL的两种编码方式
val spark: SparkSession = SparkSession.builder() .config(conf) .getOrCreate() import spark.implicits._ val schema: StructType = StructType( Seq( StructField("user_id", LongType), StructField("locale", StringType), StructField("birthYear", IntegerType), StructField("gender", StringType), StructField("joinedAt", StringType), StructField("location", StringType), StructField("timezone", StringType) ) ) val frmUsers: DataFrame = spark.read .schema(schema) .option("separator", ",") // 指定文件分割符 .option("header", "true") // 指定CSV文件包含表头 .option("quoteChar", "\"") .option("escapeChar", "\\") .csv("C:\\Users\\lenovo\\Desktop\\users.csv") .repartition(4) .cache()
此处已经创建好了DataFrame
1. 面向标准SQL语句(偷懒用)
frmUsers.registerTempTable("user_info") // 此方法已过期 spark.sql( """ |select * from user_info |where gender='female' |""".stripMargin) .show(10)
2. 使用Spark中的SQL算子(更规范)
frmUsers .where($"birthYear">1990) .groupBy($"locale") .agg( count($"locale").as("locale_count"), round(avg($"birthYear"),2).as("avg_birth_year") ) .where($"locale_count">=10 and $"avg_birth_year">=1993) .orderBy($"locale_count".desc) .select( $"locale", $"locale_count", $"avg_birth_year", dense_rank() .over(win) .as("rnk_by_locale_count"), lag($"locale_count",1) .over(win) .as("last_locale_count") ) .show(10)
七、常用算子
1.基本SQL模板
select col,cols*,agg* where conditionCols group by col,cols* having condition order by col asc|desc limit n
2.select
select语句在代码的开头可以不写,因为有后续的类似where和group by语句已经对列进行了操作,指明了列名。如果后续有select语句,则优先按照后面的select语句进行。
frmUsers.select( $"locale",$"locale_count" )
3.agg
.agg( count($"locale").as("locale_count"), round(avg($"birthYear"),2).as("avg_birth_year") )
4.窗口函数
- over子句
注意:over子句中的分区信息是可以被重用的
val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc) frmUsers ... .select( dense_rank() .over(win) .as("rnk_by_locale_count") )
5.show
show(N)表示显示符合条件的至多N条数据。(不是取前N条再提取出其中符合条件的数据)
frmUsers ... .show(10)
6.条件筛选 where
newCol:Column = $"cus_state".isNull newCol:Column = $"cus_state".isNaN newCol:Column = $"cus_state".isNotNull newCol:Column = $"cus_state".gt(10) $"cus_state">10 newCol:Column = $"cus_state".geq(10) $"cus_state">=10 newCol:Column = $"cus_state".lt(10) $"cus_state"
- over子句
- 读取数据库表
- 读取JSON文件
-
-
- 简介:
-
- Frontend(前端)
- Catalyst优化器的运行流程: