实验五 Spark SQL编程初级实践

06-15 1269阅读

Spark SQL编程初级实践

  • Spark SQL基本操作

    将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

    { "id":1 , "name":" Ella" , "age":36 }

    { "id":2, "name":"Bob","age":29 }

    { "id":3 , "name":"Jack","age":29 }

    { "id":4 , "name":"Jim","age":28 }

    { "id":4 , "name":"Jim","age":28 }

    { "id":5 , "name":"Damon" }

    { "id":5 , "name":"Damon" }

    为employee.json创建DataFrame,并写出Scala语句完成下列操作:

    1. 查询所有数据;
    2. 查询所有数据,并去除重复的数据;
    3. 查询所有数据,打印时去除id字段;
    4. 筛选出age>30的记录;
    5. 将数据按age分组;
    6. 将数据按name升序排列;
    7. 取出前3行数据;
    8. 查询所有记录的name列,并为其取别名为username;
    9. 查询年龄age的平均值;
    10. 查询年龄age的最小值。

    • 编程实现将RDD转换为DataFrame

      源文件内容如下(包含id,name,age):

      1,Ella,36

      2,Bob,29

      3,Jack,29

      请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

      • 编程实现利用DataFrame读写MySQL的数据

        (1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如表6-2所示的两行数据。

        表6-2 employee表原有数据

        id

        name

        gender

        Age

        1

        Alice

        F

        22

        2

        John

        M

        25

        (2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表6-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

        表6-3 employee表新增数据

        id

        name

        gender

        age

        3

        Mary

        F

        26

        4

        Tom

        M

        23

        实验一 :Spark SQL基本操作

        1)
        // 导入必要的库
        import org.apache.spark.sql.SparkSession
        // 创建SparkSession
        val spark = SparkSession.builder()
          .appName("Spark SQL Basic Operations")
          .getOrCreate()
        // 读取JSON文件创建DataFrame
         	val df = spark.read.json("file:///home/hadoop/employee.json")
                  // (1) 查询所有数据
        df.show()
        (2)查询所有数据,并去除重复的数据
        df.distinct().show()
        (3)
        查询所有数据,打印时去除id字段
        df.drop("id").show()
        (4)
        筛选出age>30的记录
        df.filter("age > 30").show()
        (5)
        将数据按age分组
        df.groupBy("age").count().show()
        (6)
        将数据按name升序排列
        df.orderBy("name").show()
        (7)
        取出前3行数据
        df.limit(3).show()
        (8)
        查询所有记录的name列,并为其取别名为username
        df.select($"name".alias("username")).show()
        (9)
        查询年龄age的平均值
        df.selectExpr("avg(age)").show()
        (10)
        查询年龄age的最小值
        df.selectExpr("min(age)").show()

        实验二 :编程实现将RDD转换为DataFrame

        编程代码:

        import org.apache.spark.sql.{SparkSession, Row}  
        import org.apache.spark.sql.types._  
          
        object RDDToDataFrameExample {  
          def main(args: Array[String]): Unit = {  
            // 创建SparkSession  
            val spark = SparkSession.builder()  
              .appName("RDD to DataFrame Example")  
              .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
              .getOrCreate()  
          
            import spark.implicits._  
          
            // 指定employee.txt文件的位置  
            val inputFilePath = "file:///home/hadoop/employee.txt"  
          
            // 从文本文件读取数据创建RDD  
            val rdd = spark.sparkContext.textFile(inputFilePath)  
          
            // 定义DataFrame的schema  
            val schema = StructType(Array(  
              StructField("id", IntegerType, nullable = false),  
              StructField("name", StringType, nullable = false),  
              StructField("age", IntegerType, nullable = false)  
            ))  
          
            // 将RDD转换为DataFrame  
            val dataFrame = spark.createDataFrame(rdd.map { line =>  
              val parts = line.split(",")  
              Row(parts(0).toInt, parts(1), parts(2).toInt)  
            }, schema)  
          
            // 显示DataFrame内容  
            dataFrame.show(false)  
          
            // 按照指定格式打印所有数据  
            dataFrame.collect().foreach { row =>  
              println(s"id:${row.getAs[Int]("id")},name:${row.getAs[String]("name")},age:${row.getAs[Int]("age")}")  
            }  
          
            // 停止SparkSession  
            spark.stop()  
          }  
        }

         命令

        /usr/local/spark-3.5.1/bin/spark-submit --class "RDDToDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

         具体操作参考博客

        如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

        实验三:编程实现利用DataFrame读写MySQL的数据

        mysql代码

        CREATE DATABASE sparktest;  
        USE sparktest;  
          
        CREATE TABLE employee (  
          id INT PRIMARY KEY,  
          name VARCHAR(50),  
          gender CHAR(1),  
          age INT  
        );  
          
        INSERT INTO employee (id, name, gender, age) VALUES (1, 'Alice', 'F', 22);  
        INSERT INTO employee (id, name, gender, age) VALUES (2, 'John', 'M', 25);

        如何安装msyql参考博客

         在ubuntu上安装mysql(在线安装需要)-CSDN博客

        如何安装mysl驱动程序jar包-CSDN博客

        编程代码

        import org.apache.spark.sql.{SparkSession, Row}  
        import java.util.Properties  
        import org.apache.spark.sql.SparkSession  
        import org.apache.spark.sql.Dataset  
        import org.apache.spark.sql.Row  
        import org.apache.spark.sql.functions.max  
        import org.apache.spark.sql.functions.sum  
          
        object MySQLDataFrameExample {  
          def main(args: Array[String]): Unit = {  
            // 创建SparkSession  
            val spark = SparkSession.builder()  
              .appName("MySQL DataFrame Example")  
              .master("local[*]") // 使用本地模式,如果连接到集群请更改这里  
              .getOrCreate()  
          
            import spark.implicits._  
          
            // 配置MySQL JDBC连接  
            val jdbcProperties = new Properties()  
            jdbcProperties.setProperty("user", "root")  
            jdbcProperties.setProperty("password", "mysql")  
            jdbcProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")  
          
            // 定义MySQL的JDBC连接URL  
            val jdbcUrl = "jdbc:mysql://localhost:3306/sparktest"  
          
            // 创建DataFrame以插入数据  
            val newEmployeeData = Seq(  
              (3, "Mary", "F", 26),  
              (4, "Tom", "M", 23)  
            ).toDF("id", "name", "gender", "age")  
          
            // 将DataFrame数据插入到MySQL的employee表中  
            newEmployeeData.write  
              .mode("append") // 使用append模式来添加数据,而不是覆盖  
              .jdbc(jdbcUrl, "employee", jdbcProperties)  
          
            // 从MySQL读取employee表的数据  
            val employeeDF = spark.read  
              .jdbc(jdbcUrl, "employee", jdbcProperties)  
          
            // 打印age的最大值  
            val maxAge = employeeDF.agg(max("age")).collect()(0).getAs[Int](0)  
            println(s"Max age: $maxAge")  
          
            // 打印age的总和  
            val sumAge = employeeDF.agg(sum("age")).collect()(0).getAs[Long](0)  
            println(s"Sum of ages: $sumAge")  
          
            // 停止SparkSession  
            spark.stop()  
          }  
        }
        

        编程详细步骤参考

         如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

         运行命令

        /usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

        产生错误

        主要问题都在实验三中,因为实验三中涉及到一个mysql数据库连接

        命令更新为

        /usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

        加了一个mysl驱动的jar的引用

        如何安装mysql驱动参考博客

        如何安装mysl驱动程序jar包-CSDN博客

        打包失败

        实验五 Spark SQL编程初级实践

        这个问题是代码错误

        代码未引入一些包

        加上下面这些就可以了

        import org.apache.spark.sql.{SparkSession, Row}  

        import java.util.Properties  

        import org.apache.spark.sql.SparkSession  

        import org.apache.spark.sql.Dataset  

        import org.apache.spark.sql.Row  

        import org.apache.spark.sql.functions.max  

        import org.apache.spark.sql.functions.sum  

        运行失败

        实验五 Spark SQL编程初级实践

        未引入mysl驱动程序

        要下载mysql驱动

        实验五 Spark SQL编程初级实践

        采用命令引入

        /usr/local/spark-3.5.1/bin/spark-submit --jars /home/hadoop/mysql-connector-j-8.3.0/mysql-connector-j-8.3.0.jar  --class "MySQLDataFrameExample" ./target/scala-2.12/simple-project_2.12-1.9.0.jar

        参考链接

        如何安装sbt(sbt在ubuntu上的安装与配置)(有详细安装网站和图解)-CSDN博客

        在ubuntu上安装mysql(在线安装需要)-CSDN博客

        在ubuntu上安装mysql(在线安装需要)-CSDN博客

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]