Spark SQL

07-14 1805阅读

Spark SQL

Spark SQL

Spark SQL

一、Spark SQL架构

  • 能够直接访问现存的Hive数据

  • 提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理

  • 提供更高层级的接口方便处理数据

  • 支持多种操作方式:SQL、API编程

    • API编程:Spark SQL基于SQL开发了一套SQL语句的算子,名称和标准的SQL语句相似
    • 支持Parquet、CSV、JSON、RDBMS、Hive、HBase等多种外部数据源。(掌握多种数据读取方式)

      Spark SQL

    • Spark SQL核心:是RDD+Schema(算子+表结构),为了更方便我们操作,会将RDD+Schema发给DataFrame

    • 数据回灌:用于将处理和清洗后的数据回写到Hive中,以供后续分析和使用。

    • BI Tools:主要用于数据呈现。

    • Spark Application:开发人员使用Spark Application编写数据处理和分析逻辑,这些应用可以用不同的编程语言编写,比如Python、Scala、Java等。

      二、Spark SQL运行原理

      Spark SQL

      • Catalyst优化器的运行流程:
        1. Frontend(前端)
          • 输入:用户可以通过SQL查询或DataFrame API来输入数据处理逻辑。
          • Unresolved Logical Plan(未解析的逻辑计划):输入的SQL查询或DataFrame转换操作会首先被转换为一个未解析的逻辑计划,这个计划包含了用户请求的所有操作,但其中的表名和列名等可能尚未解析。
          • Catalyst Optimizer(Catalyst优化器) Catalyst优化器是Spark SQL的核心组件,它负责将逻辑计划转换为物理执行计划,并进行优化。Catalyst优化器包括以下几个阶段:
            • Analysis(分析):将未解析的逻辑计划中的表名和列名解析为具体的元数据,这一步依赖于Catalog(元数据存储)。输出是一个解析后的逻辑计划。
            • Logical Optimization(逻辑优化):对解析后的逻辑计划进行各种优化,如投影剪切、过滤下推等。优化后的逻辑计划更加高效。
            • Physical Planning(物理计划):将优化后的逻辑计划转换为一个或多个物理执行计划。每个物理计划都代表了一种可能的执行方式。
            • Cost Model(成本模型):评估不同物理计划的执行成本,选择代价最低的物理计划作为最终的物理计划。
            • Backend(后端)
              • Code Generation(代码生成):将选择的物理计划转换为可以在Spark上执行的RDD操作。这一步会生成实际的执行代码。
              • RDDs:最终生成的RDD操作被执行,以完成用户请求的数据处理任务。
        • 一个SQL查询在Spark SQL中的优化流程
          SELECT name FROM(
            SELECT id, name FROM people
          ) p
          WHERE p.id = 1
          

          Spark SQL

          • Filter下压:将Filter操作推到更靠近数据源的位置,以减少不必要的数据处理。
          • 合并Projection:减少不必要的列选择
          • IndexLookup return:name:如果存在索引,可以直接通过索引查找并返回name列

            三、Spark SQL API

            1. SparkContext:Spark应用的主入口,代表了与Spark集群的连接。

            2. SQLContext:Spark SQL的编程入口,使用SQLContext可以运行SQL查询、加载数据源和创建DataFrame。

            3. HiveContext:SQLContext的一个子集,可以执行HiveQL查询,并且可以访问Hive元数据和UDF。

            4. SparkSession:Spark2.0后推荐使用,合并了SQLContext和HiveContext,提供了与Spark所有功能交互的单一入口点。

              创建一个SparkSession就包含了一个SparkContext。

            5. 若同时需要创建SparkContext和SparkSession,必须先创建SparkContext再创建SparkSession。否则,会抛出如下异常,提示重复创建SparkContext:

            详细解释
            创建SparkSession的代码
            val conf: SparkConf = new SparkConf()
            	.setMaster("local[4]")
            	.setAppName("SparkSql")
            def main(args: Array[String]): Unit = { 
            	SparkSession.builder()
            		.config(conf)
            		.getOrCreate()
            }
            
            优化:减少创建代码,SparkSessionBuilder工具类
            package com.ybg
            import org.apache.spark.{SparkConf, SparkContext}
            import org.apache.spark.sql.SparkSession
            // 封装SparkSession的创建方法
            class SparkSessionBuilder(master:String,appName:String){
              lazy val config:SparkConf = {
                new SparkConf()
                  .setMaster(master)
                  .setAppName(appName)
              }
              lazy val spark:SparkSession = {
                SparkSession.builder()
                  .config(config)
                  .getOrCreate()
              }
              lazy val sc:SparkContext = {
                spark.sparkContext
              }
              def stop(): Unit = {
                if (null != spark) {
                  spark.stop()
                }
              }
            }
            object SparkSessionBuilder {
              def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName)
            }
            

            四、Spark SQL依赖

            pom.xml
              8
              8
              UTF-8
              3.1.2
              2.12
              3.1.3
              8.0.33
              3.1.2
              2.3.5
              2.10.0
            
            
              
              
                org.apache.spark
                spark-core_${spark.scala.version}
                ${spark.version}
              
              
              
                org.apache.spark
                spark-sql_${spark.scala.version}
                ${spark.version}
              
              若出现如下异常:
              Caused by: com.fasterxml.jackson.databind.JsonMappingException: 
              Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and 
              
              
                  com.fasterxml.jackson.core
                  jackson-databind
                  2.10.0
              
              
              
              
                com.mysql
                mysql-connector-j
                ${mysql.version}
              
            
            
            log4j.properties

            log4j.properties应该放在资源包下。

            log4j.rootLogger=ERROR, stdout, logfile # 设置可显示的信息等级
            log4j.appender.stdout=org.apache.log4j.ConsoleAppender
            log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
            log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
            log4j.appender.logfile=org.apache.log4j.FileAppender
            log4j.appender.logfile.File=log/spark_first.log
            log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
            log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
            

            五、Spark SQL数据集

            1、DataSet
            • 简介:
              • 从Spark 1.6开始引入的新的抽象。
              • 是特定领域对象中的强类型集合。
              • 可以使用函数式编程或SQL查询进行操作。
              • 等于RDD + Schema。
                2、DataFrame
                • 简介:
                  • DataFrame是特殊的DataSet:DataFrame=DataSet[Row],行对象的集合,每一行就是一个行对象。
                  • 类似于传统数据的二维表格。
                  • 特性:
                    • Schema:在RDD基础上增加了Schema,描述数据结构信息
                    • 嵌套数据类型:支持struct,map,array等嵌套数据类型。
                    • API:提供类似SQL的操作接口。
                      详细解释
                      创建DataSet的代码
                      val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
                      // 提供了一组隐式转换,这些转换允许将Scala的本地集合类型(如Seq、Array、List等)转换为Spark的DataSet。
                      import spark.implicits._
                      val dsPhone: Dataset[Product] = spark.createDataset(Seq(
                        Product(1, "Huawei Mate60", 5888.0f),
                        Product(2, "IPhone", 5666.0f),
                        Product(3, "OPPO", 1888.0f)
                      ))
                      dsPhone.printSchema()
                      /**
                       * root
                       * |-- id: integer (nullable = false)
                       * |-- name: string (nullable = true)
                       * |-- price: float (nullable = false)
                       */
                      
                      创建DataFrame的代码
                      • 读取CSV文件

                        • 对于CSV文件,在构建DataFrame之前,必须要先创建一个Schema,再根据文件类型分不同情况进行导入。(读取JSON文件或者数据库表都并不需要)

                        • 注意:必须要import spark.implicits._,导入隐式类,才能够识别一些隐式转换,否则会报错。

                        • CSV文件在创建DataFrame时,可以选择尽量模仿Hive中的OpenCSVSerDe的

                          val spark: SparkSession = SparkSession.builder()
                          .config(conf)
                          .getOrCreate()
                          import spark.implicits._
                          val schema: StructType = StructType(
                            Seq(
                              StructField("user_id", LongType),
                              StructField("locale", StringType),
                              StructField("birthYear", IntegerType),
                              StructField("gender", StringType),
                              StructField("joinedAt", StringType),
                              StructField("location", StringType),
                              StructField("timezone", StringType)
                            )
                          )
                          val frmUsers: DataFrame = spark.read
                          .schema(schema)
                          .option("separator", ",") // 指定文件分割符
                          .option("header", "true") // 指定CSV文件包含表头
                          .option("quoteChar", "\"")
                          .option("escapeChar", "\\")
                          .csv("C:\\Users\\lenovo\\Desktop\\users.csv")
                          .repartition(4)
                          .cache()
                          
                          • 读取JSON文件
                            val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json")
                            frmUsers2.show()
                            
                            • 读取数据库表
                              val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 数据库连接地址
                              val mysql = new Properties()
                              mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver")
                              mysql.setProperty("user", "root")
                              mysql.setProperty("password", "123456")
                              spark
                                .read
                                .jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,连接属性)
                                .show(100)
                              

                              六、Spark_SQL的两种编码方式

                              val spark: SparkSession = SparkSession.builder()
                              .config(conf)
                              .getOrCreate()
                              import spark.implicits._
                              val schema: StructType = StructType(
                                Seq(
                                  StructField("user_id", LongType),
                                  StructField("locale", StringType),
                                  StructField("birthYear", IntegerType),
                                  StructField("gender", StringType),
                                  StructField("joinedAt", StringType),
                                  StructField("location", StringType),
                                  StructField("timezone", StringType)
                                )
                              )
                              val frmUsers: DataFrame = spark.read
                              .schema(schema)
                              .option("separator", ",") // 指定文件分割符
                              .option("header", "true") // 指定CSV文件包含表头
                              .option("quoteChar", "\"")
                              .option("escapeChar", "\\")
                              .csv("C:\\Users\\lenovo\\Desktop\\users.csv")
                              .repartition(4)
                              .cache()
                              

                              此处已经创建好了DataFrame

                              1. 面向标准SQL语句(偷懒用)
                              frmUsers.registerTempTable("user_info") // 此方法已过期
                              spark.sql(
                                    """
                                      |select * from user_info
                                      |where gender='female'
                                      |""".stripMargin)
                              		.show(10)
                              
                              2. 使用Spark中的SQL算子(更规范)
                              frmUsers
                                    .where($"birthYear">1990)
                                    .groupBy($"locale")
                                    .agg(
                                      count($"locale").as("locale_count"),
                                      round(avg($"birthYear"),2).as("avg_birth_year")
                                    )
                                    .where($"locale_count">=10 and $"avg_birth_year">=1993)
                                    .orderBy($"locale_count".desc)
                                    .select(
                                      $"locale", $"locale_count", $"avg_birth_year",
                                      dense_rank()
                                        .over(win)
                                        .as("rnk_by_locale_count"),
                                      lag($"locale_count",1)
                                        .over(win)
                                        .as("last_locale_count")
                                    )
                                    .show(10)
                              

                              七、常用算子

                              1.基本SQL模板
                              select
                              		col,cols*,agg*
                              where
                              		conditionCols
                              group by
                              		col,cols*
                              having
                              		condition
                              order by
                              		col asc|desc
                              limit
                              		n
                              
                              2.select

                              select语句在代码的开头可以不写,因为有后续的类似where和group by语句已经对列进行了操作,指明了列名。如果后续有select语句,则优先按照后面的select语句进行。

                              frmUsers.select(
                              	$"locale",$"locale_count"
                              )
                              
                              3.agg
                              .agg(
                                count($"locale").as("locale_count"),
                                round(avg($"birthYear"),2).as("avg_birth_year")
                              )
                              
                              4.窗口函数
                              • over子句

                                注意:over子句中的分区信息是可以被重用的

                                val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc)
                                frmUsers
                                  ...
                                  .select(
                                		dense_rank()
                                  		.over(win)
                                  		.as("rnk_by_locale_count")
                                	)
                                
                                5.show

                                show(N)表示显示符合条件的至多N条数据。(不是取前N条再提取出其中符合条件的数据)

                                frmUsers
                                  ...
                                  .show(10)
                                
                                6.条件筛选 where
                                newCol:Column = $"cus_state".isNull
                                newCol:Column = $"cus_state".isNaN
                                newCol:Column = $"cus_state".isNotNull
                                newCol:Column = $"cus_state".gt(10)			$"cus_state">10
                                newCol:Column = $"cus_state".geq(10)		$"cus_state">=10
                                newCol:Column = $"cus_state".lt(10)			$"cus_state"
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]