Spark离线开发指南(详细版)

07-02 1645阅读

文章目录

  • 1--Spark—core
      • **2.1--RDD的创建**
        • 2.1.1--并行化创建
        • 2.1.2--获取分区数
        • 2.1.3--读取文件创建RDD
          • textFile
          • wholeTextFile
          • 2.2--RDD算子
            • 2.2.1--算子概念
            • 2.2.2--Transformation算子
              • map
              • flatMap
              • reduceByKey
              • mapValues
              • groupBy
              • filter
              • distinct
              • union
              • join
              • intersection
              • glom
              • groupByKey
              • sortBy
              • sortByKey
              • keys
              • values
              • 2.2.3--Action算子
                • countByKey
                • collect
                • reduce
                • fold
                • aggregate
                • first
                • take
                • top
                • count
                • takeSample
                • takeOrdered
                • foreach
                • saveAsTextFile
                • collectAsMap
                • 2.2.4--分区操作算子
                  • mapPartition
                  • foreachPartition
                  • partitionBy
                  • repartition
                  • coalesce
                  • 2.3--RDD缓存
                    • 2.3.1--RDD缓存的目的
                    • 2.3.1--RDD缓存的特点
                    • 2.3.2--RDD缓存的API
                    • 2.3.3--RDD的CheckPoint
                    • 2.4--广播变量
                      • 2.4.1--概念
                      • 2.4.2--API
                      • 2.5--累加器
                        • 2.5.1--需求
                        • 2.5.2--没有累加器的代码演示
                        • 2.5.3--增加累加器的代码
                        • 2.6--python程序对hdfs文件操作
                        • 2--SparkSQL
                            • 3.1--DataFrame
                              • 3.1.1--SparkSession对象环境的创建
                              • 3.1.2--DataFrame的组成
                              • 3.1.3--DataFrame代码构建
                                • 基于RDD方式1
                                • 基于RDD方式2
                                • 基于RDD方式3
                                • 基于Pandas的DataFrame
                                • 读取外部数据
                                • 读取mysql数据
                                • 读取hive数据
                                • 读取es数据
                                • 3.1.4--DataFrame DSL风格演示
                                • 3.1.5--DataFrame SQL风格演示
                                • 3.1.6--数据清理
                                  • 数据去重
                                  • 缺失值处理
                                  • 3.1.7--数据写出
                                    • 写出到文件
                                    • 写出到mysql
                                    • 写出到hive
                                    • 写出到es
                                    • 3.2--DataFrameAPI
                                        • printSchema
                                        • show
                                        • createTempView
                                        • createOrReplaceTempView
                                        • createGlobalTempView
                                        • select
                                        • join
                                        • filter
                                        • where
                                        • groupBy
                                        • orderBy
                                        • withColumn
                                        • withColumnRenamed
                                        • first
                                        • limit
                                        • dropDuplicates
                                        • dropna
                                        • fillna
                                        • write
                                        • 3.3 类似于RDD的API
                                            • count
                                            • collect
                                            • take
                                            • first
                                            • head
                                            • tail
                                            • foreach
                                            • foreachPartition
                                            • distinct
                                            • union/unionAll
                                            • coalesce/repartition
                                            • cache/persist
                                            • unpersist
                                            • columns
                                            • schema
                                            • rdd
                                            • printSchema
                                            • 3.4--GroupDataAPI
                                                • count
                                                • avg
                                                • sum
                                                • min
                                                • max
                                                • round
                                                • agg
                                                • 3.5--column的常用API
                                                    • alias
                                                    • astype
                                                    • between
                                                    • cast
                                                    • astype
                                                    • contains
                                                    • endswith(other)
                                                    • eqNullSafe(other)
                                                    • lit
                                                    • 3.6--UDF
                                                      • 3.6.1--UDF的定义
                                                      • 3.6.2--UDF的返回值
                                                      • 3.6.3--UDAF by RDD
                                                      • 3.7--窗口函数
                                                        • DSL风格
                                                        • SQL风格
                                                        • 3--spark优化

                                                          1–Spark—core

                                                          2.1–RDD的创建

                                                          2.1.1–并行化创建

                                                          概念:并行化创建是指:将本地集合=>分布式RDD,这一步就是分布式的开端:本地转分布式

                                                          Spark离线开发指南(详细版)
                                                          (图片来源网络,侵删)

                                                          API:

                                                          • parallelize(参数1,参数2)
                                                          • 参数1 集合对象即可 比如list
                                                          • 参数2 分区数,不写默认是电脑的线程数
                                                            # 导入spark相关的包
                                                            from pyspark import SparkConf,SparkContext
                                                            if __name__ == '__main__':
                                                               
                                                                # 初始化sparkcontext的对象
                                                                conf = SparkConf().setAppName("text").setMaster("local[*]")
                                                                sc = SparkContext(conf=conf)
                                                                # 通过并行化的方式创建RDD
                                                                rdd= sc.parallelize([1,2,3,4,5,6,7,8,9])
                                                                 
                                                                # parallelize没有给分区数 默认是电脑cpu的线程
                                                                print("默认分区",rdd.getNumPartitions())
                                                                rdd_par = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
                                                                print("分区数",rdd_par.getNumPartitions())
                                                                # 打印rdd的内容
                                                                # collect方法是将rdd中的每个分区的数据都发送到Driver,
                                                                # 形成一个python的list的对象 分布式对象==>本地集合
                                                                print("内容",rdd_par.collect())
                                                            
                                                            2.1.2–获取分区数

                                                            getNumPartitions() 获取RDD的分区数量 返回值是Int数字

                                                            API:

                                                            • rdd.getNumPartitions()
                                                              2.1.3–读取文件创建RDD
                                                              textFile

                                                              textFile():可以读取本地数据,也可以读取hdfs数据

                                                              API:

                                                              • sparkcontext.textFile(参数1,参数2)

                                                              • 参数1:必填,文件路径支持本地,支持HDFS,也支持一些比如S3协议

                                                              • 参数2:可选,表示最小分区数量

                                                              • 注意:参数2话语权不足,spark有自己的判断,在它的允许的范围内,参数2才有效果,超出spark允许的范围,参数2就失效

                                                                读取本地文件:

                                                                file_rdd1 = sc.textFile("../data/input/a.txt")
                                                                print("默认读取分区数:",file_rdd1.getNumPartitions())
                                                                print("内容",file_rdd1.collect())
                                                                

                                                                参数2的用法:

                                                                # 加最小分区数测试 最小分区数只是个参考值,spark有自己的判断
                                                                file_rdd2 = sc.textFile("../data/input/a.txt",5)
                                                                file_rdd3 = sc.textFile("../data/input/a.txt",100)
                                                                print("rdd2的分区数:",file_rdd2.getNumPartitions())
                                                                print("rdd3的分区数",file_rdd3.getNumPartitions())
                                                                

                                                                读取HDFS文件:

                                                                hdfs_rdd = sc.textFile("hdfs://node1:8020/input/b.txt")
                                                                print("hdfs_rdd分区数:",hdfs_rdd.getNumPartitions())
                                                                print("hdfs_rdd内容:",hdfs_rdd.collect())
                                                                
                                                                wholeTextFile

                                                                wholeTextFile()适合读取一堆小文件,这个API适合读取小文件,因此文件的数据很小,分区很多,导致shuffle的几率更高,所以尽量少分区读取数据

                                                                API:

                                                                • sparkcontext.wholeTextFile(参数1,参数2)
                                                                • 参数1:必填,文件夹的目录,支持本地,支持HDFS,也支持一些比如S3协议
                                                                • 参数2:可选,表示最小分区数量
                                                                • 注意:参数2话语权不足,这API的分区数量最多只能开到文件数量
                                                                  rdd = sc.wholeTextFiles("../data/input/tiny_files")
                                                                  print(rdd.map(lambda x:x[1]).collect())
                                                                  

                                                                  返回结果为二元组的形式展示, 前一个值是文件路径, 后一个值为文件内容

                                                                  2.2–RDD算子

                                                                  2.2.1–算子概念

                                                                  算子:分布式集合对象上的API称之为算子

                                                                  分类:

                                                                  • Transformation:转换算子
                                                                  • Action:动作(行动)算子

                                                                    转换算子:

                                                                    • 定义:RDD的算子,返回值仍然是一个RDD的,称之为转换算子
                                                                    • 特性:这类算子是lazy 懒加载的,如果没有action算子,Transformation算子是不工作的

                                                                      动作算子:

                                                                      • 定义:返回值不是RDD的就是action算子
                                                                        2.2.2–Transformation算子
                                                                        map

                                                                        map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD

                                                                        API:

                                                                        • rdd.map(func)
                                                                        • func : f:(T)=>U
                                                                        • f:表示这是一个函数(方法)
                                                                        • (T)=> U 表示方法传入参数为任意了类型,返回值也是任意类型
                                                                        • (A)=> A 表示传入参数为任意类型,返回值也是任意类型。但是传入值和返回值类型相同
                                                                          rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
                                                                          # 定义方法作为算子的传入
                                                                          def add(data):
                                                                              return data*10
                                                                          print(rdd.map(add).collect())
                                                                          # 更简单的方式是定义lamda表达式来写的匿名函数
                                                                          print(rdd.map(lambda data: data * 10).collect())
                                                                          # 两种方式都可以,第一种适合复杂函数,第二种适合一行解决的函数
                                                                          #结果
                                                                          [10, 20, 30, 40, 50, 60, 70, 80, 90]
                                                                          [10, 20, 30, 40, 50, 60, 70, 80, 90]
                                                                          
                                                                          flatMap

                                                                          flatMap对rdd执行map操作,然后进行解除嵌套操作.

                                                                          API:

                                                                          • rdd.flatMap(func)
                                                                          • func : f:(T)=>U

                                                                            eg:

                                                                            • 嵌套的list: list[[1,2,3,4,5,6],[7,8],[9]]
                                                                            • 解除嵌套的list: list[1,2,3,4,5,6,7,8,9]
                                                                              rdd = sc.parallelize(["hadoop spark hadoop","hadoop flink","spark hadoop"])
                                                                              rdd2 = rdd.map(lambda line: line.split(" "))
                                                                              # flatmap无需传参数就可以直接解除嵌套
                                                                              rdd3 = rdd.flatMap(lambda line:line.split(" "))
                                                                              #结果
                                                                              ['hadoop', 'spark', 'hadoop', 'hadoop', 'flink', 'spark', 'hadoop']
                                                                              
                                                                              reduceByKey

                                                                              针对(K,V)型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作

                                                                              API:

                                                                              • rdd.reduceByKey(func)
                                                                              • func:(v,v)=>v
                                                                              • 接受两个传入参数(类型一致),返回一个值,类型和传入的要求一致
                                                                                rdd = sc.parallelize([('a',1),('b',1),('c',1),('a',1),('a',1)])
                                                                                print(rdd.reduceByKey(lambda a, b: a + b).collect())
                                                                                #结果
                                                                                [('b', 1), ('c', 1), ('a', 3)]
                                                                                
                                                                                mapValues

                                                                                分区操作算子 就只对value 进行操作

                                                                                API:

                                                                                rdd.mapValue(func)

                                                                                rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('a',5)])
                                                                                # 就只对value 进行操作
                                                                                print(rdd.mapValues(lambda value: value * 10).collect())
                                                                                #结果
                                                                                
                                                                                groupBy

                                                                                将RDD的数据进行分组

                                                                                API:

                                                                                • rdd.groupBy(func)
                                                                                • func(T)=>K
                                                                                • 函数要求传入一个参数,返回一个返回值,类型无所谓
                                                                                • 这个函数是拿到你的输入值后,将所有相同输入值的放在一个组里
                                                                                • 分组完成后,每个组是一个二元组,key就是输入值,所有的数据都放入一个迭代器里作为value
                                                                                  rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
                                                                                  # groupby 传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可)
                                                                                  result_rdd = rdd.groupBy(lambda t:t[0])
                                                                                  print(result_rdd.collect())
                                                                                  # 将value的值强制转换,才能查看迭代器的内容
                                                                                  print(result_rdd.map(lambda t: (t[0], list(t[1]))).collect())
                                                                                  #结果
                                                                                  
                                                                                  filter

                                                                                  过滤想要的数据进行保留

                                                                                  API:

                                                                                  • rdd.filter(func)
                                                                                  • func(T)=>bool
                                                                                  • 传入任意类型的参数,返回值是False或者True
                                                                                    rdd = sc.parallelize([1,2,3,4,5])
                                                                                    # 通过filter算子过滤出奇数
                                                                                    result_rdd = rdd.filter(lambda x:x%2 == 1)
                                                                                    print(result_rdd.collect())
                                                                                    #结果
                                                                                    
                                                                                    distinct

                                                                                    对RDD数据进行去重,返回新的RDD

                                                                                    API:

                                                                                    • rdd.distinct(参数1)
                                                                                    • 参数1,去重分区数量,一般不用传
                                                                                      rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 1, 2, 5, 9, 3])
                                                                                      # 去重
                                                                                      print(rdd.distinct().collect())
                                                                                      #结果
                                                                                      
                                                                                      union

                                                                                      2个RDD合并成1个RDD返回

                                                                                      API:

                                                                                      • rdd.union(other_rdd)
                                                                                      • 注意只会合并不会去重
                                                                                      • 不同类型的RDD也可以混合
                                                                                      • 合并后的分区数是rdd1和rdd2的分区数和
                                                                                        rdd = sc.parallelize([1, 2, 3, 1, 2, 5, 9, 3])
                                                                                        rdd1 = sc.parallelize(["c", "b", "a"])
                                                                                        # 将两个rdd进行合并
                                                                                        print(rdd.union(rdd1).collect())
                                                                                        #结果
                                                                                        
                                                                                        join

                                                                                        分区操作算子 对两个RDD进行Join操作(可实现sql的内\外连接)

                                                                                        API:

                                                                                        • 注意:join算子只能用于二元元组

                                                                                        • rdd.join(other_rdd) 内连接

                                                                                        • rdd.leftOuterJoin(other_rdd)左外连接

                                                                                        • rdd.rightOuterJoin(other_rdd)右外连接

                                                                                          rdd1 = sc.parallelize([(1001,"zhangshan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")])
                                                                                          rdd2 = sc.parallelize([(1001,"销售部"),(1002,"科技部")])
                                                                                          # 通过join来进行rdd间的关联
                                                                                          # 对于join来说,关联条件按照二元组key来进行关联
                                                                                          print(rdd1.join(rdd2).collect())
                                                                                          # 左外连接
                                                                                          print(rdd1.leftOuterJoin(rdd2).collect())
                                                                                          # 右外连接
                                                                                          print(rdd1.rightOuterJoin(rdd2).collect())
                                                                                          #结果
                                                                                          
                                                                                          intersection

                                                                                          求2个RDD的交集,返回一个新的rdd

                                                                                          API:

                                                                                          • rdd.intersection(other_rdd)
                                                                                            rdd1 = sc.parallelize([1,2,3,4,'a','b'])
                                                                                            rdd2 = sc.parallelize([2,3,4,'b'])
                                                                                            # 求rdd间的交集
                                                                                            print(rdd1.intersection(rdd2).collect())
                                                                                            #结果
                                                                                            
                                                                                            glom

                                                                                            将RDD的数据加上嵌套,这个嵌套按照分区来进行

                                                                                            API:

                                                                                            • rdd.glom()

                                                                                              eg:

                                                                                              • rdd数据为[1,2,3,4,5]有两个分区,那么glom后,数据就有可能变成[[1,2,3],[4,5]]
                                                                                                rdd = sc.parallelize([1,2,3,4,5,6,7,8],3)
                                                                                                # glom可以查看分区排布
                                                                                                print(rdd.glom().collect())
                                                                                                #结果
                                                                                                
                                                                                                groupByKey

                                                                                                针对KV型RDD,自动按照key分组

                                                                                                API:

                                                                                                • rdd.groupByKey()
                                                                                                  rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])
                                                                                                  # 与groupby相同,只是不需要指定 直接就是按照key分组的,
                                                                                                  # 并且第二个值是相同key的value的集合
                                                                                                  rdd1 = rdd.groupByKey()
                                                                                                  rdd2 = rdd1.map(lambda x:(x[0],list(x[1])))
                                                                                                  print(rdd2.collect())
                                                                                                  #结果
                                                                                                  [('b', [1, 1, 1]), ('a', [1, 1])]
                                                                                                  
                                                                                                  sortBy

                                                                                                  对RDD数据进行排序,基于你指定的排序依据

                                                                                                  API:

                                                                                                  • rdd.sortBy(func,ascending=False,numPartition=1)

                                                                                                  • func: (T)=>U 告知按照rdd中的哪个数据进行排序 比如:lambda x:x[1] 表示按照rdd中的第二列元素进行排序

                                                                                                  • ascending True升序 False降序

                                                                                                  • numPartition 用多少分区进行排序

                                                                                                    rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c', 1)],3)
                                                                                                    # 使用sortby对数据进行排序
                                                                                                    # 按照value进行排序
                                                                                                    # 注意 如果要全局有序 排序分区数应设为1
                                                                                                    rdd1 = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1)
                                                                                                    print(rdd1.collect())
                                                                                                    #结果
                                                                                                    [('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 3)]
                                                                                                    
                                                                                                    sortByKey

                                                                                                    针对KV型RDD,按照key进行排序

                                                                                                    API:

                                                                                                    • sortByKey(ascending=True,numPartitions=None,keyfunc=
                                                                                                    • ascending:升序or降序,True升序, False降序,默认是升序
                                                                                                    • numPartitions:按照几个分区进行排序,如果全局有序,设置1
                                                                                                    • keyfunc:在排序前对key进行处理,语法是: (k)→ U ,一个参数传入,返回一个值
                                                                                                      rdd = sc.parallelize([('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)],3)
                                                                                                      # keyfunc = lambda key: str(key).lower() 忽略大小写的影响
                                                                                                      rdd1 = rdd.sortByKey(ascending=True,numPartitions=3,keyfunc=lambda key:str(key).lower())
                                                                                                      print(rdd1.collect())
                                                                                                      #结果
                                                                                                      [('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)]
                                                                                                      
                                                                                                      keys

                                                                                                      概述: rdd必须是键值对类型的数据, 获取键值对所有的键(key).

                                                                                                       
                                                                                                      
                                                                                                      values

                                                                                                      概述: rdd必须是键值对类型的数据, 获取键值对所有的值(value).

                                                                                                       
                                                                                                      
                                                                                                      2.2.3–Action算子
                                                                                                      countByKey

                                                                                                      统计key出现的次数(一般适用于KV型的RDD)

                                                                                                      rdd = sc.textFile("../data/input/a.txt")
                                                                                                      rdd2 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
                                                                                                      # 通过countbykey来对key进行计数 这是一个Action算子 不能colloct
                                                                                                      result = rdd2.countByKey()
                                                                                                      print(result)
                                                                                                      print(type(result))
                                                                                                      #结果
                                                                                                      defaultdict(, {'hadoop': 2, 'hello': 2, 'zxz': 2, 'wrx': 2})
                                                                                                      
                                                                                                      
                                                                                                      collect

                                                                                                      将RDD各个分区内的数据,统一收集到Driver中,形成List对象

                                                                                                      API:

                                                                                                      • rdd.collect()
                                                                                                      • 这个算子,是将RDD各个分区数据都拉取到Driver
                                                                                                      • 注意的是, RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大。不然,会把Driver内存撑爆
                                                                                                        reduce

                                                                                                        对RDD数据集按照传入的逻辑进行聚合

                                                                                                        API:

                                                                                                        • rdd.reduce(func)
                                                                                                        • func:(T,T)=>T
                                                                                                        • 两个参数传入,一个返回值,返回值和参数要求类型一致
                                                                                                          rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])
                                                                                                              # reduce是action算子
                                                                                                              result = rdd.reduce(lambda a,b:a+b)
                                                                                                              print(result)
                                                                                                          #结果
                                                                                                          45
                                                                                                          
                                                                                                          fold

                                                                                                          和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的,这个初始值聚合会作用在分区内和分区间

                                                                                                          API:

                                                                                                          • rdd,fold(初始值,func)

                                                                                                            eg:

                                                                                                            • 比如:[[1,2,3],[4,5,6],[7,8,9]]数据分布在3个分区
                                                                                                            • 分区1 123聚合的时候带上10作为初始值得到16
                                                                                                            • 分区2 456聚合的时候带上10作为初始值得到25
                                                                                                            • 分区3 789聚合的时候带上10作为初始值得到34
                                                                                                            • 3个分区的结果做聚合也带上初始值10,所以结果是: 10+16+ 25+34=85
                                                                                                              rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
                                                                                                              # print(rdd.glom().collect())
                                                                                                              result = rdd.fold(10,lambda a,b:a+b)
                                                                                                              print(result)
                                                                                                              #结果
                                                                                                              85
                                                                                                              
                                                                                                              aggregate

                                                                                                              和fold算子相同, 唯一不同的是可以分别定义分区内计算函数和分区外计算函数

                                                                                                                  rdd1 = sc.parallelize(range(5), 2)
                                                                                                                  print(rdd1.glom().collect())
                                                                                                                  # aggregate算子演示
                                                                                                                  seqOp = (lambda x,y:x+y)
                                                                                                                  comOp = (lambda x,y:x+y+1)
                                                                                                                  print(rdd1.aggregate(2, seqOp, comOp))
                                                                                                              
                                                                                                              • 下图所示的是三个算子执行时的计算过程, 这里使用的是累加的过程

                                                                                                              • fold和aggregate计算过程是一样的, 但是唯一区别是aggregate可以指定分区内外的计算方式不同.

                                                                                                              • 设置初始值的算子,初始值被加分区数n+1次

                                                                                                                first

                                                                                                                取出RDD第一个元素

                                                                                                                rdd = sc.parallelize([1,4,3,2,7,6,2,1,9])
                                                                                                                print(rdd.first())
                                                                                                                #结果
                                                                                                                1
                                                                                                                
                                                                                                                take

                                                                                                                取RDD的前N个元素,组合成List返回

                                                                                                                注意: take是将rdd中前n个元素拉取到Driver所在主机的内存中, 由于driver的内存是有限的, 数据过多会造成driver的内存溢出.

                                                                                                                print(rdd.take(4))
                                                                                                                #结果
                                                                                                                [1, 4, 3, 2]
                                                                                                                
                                                                                                                top

                                                                                                                对RDD数据集进行降序排序,取前N个

                                                                                                                print(rdd.top(3))
                                                                                                                #结果
                                                                                                                [9, 7, 6]
                                                                                                                
                                                                                                                count

                                                                                                                计算RDD有多少个数据,返回值是个数字

                                                                                                                print(rdd.count())
                                                                                                                #结果
                                                                                                                9
                                                                                                                
                                                                                                                takeSample

                                                                                                                随机抽样RDD的数据

                                                                                                                API:

                                                                                                                • takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
                                                                                                                • 参数1:True表示运行取同一个数据,False表示不允许取同一个数据. 和数据内容无关,是否重复表示的是同一个位置的数据
                                                                                                                • 参数2:抽样要几个
                                                                                                                • 参数3∶随机数种子,这个参数传入—个数字即可,随意给
                                                                                                                  rdd = sc.parallelize([1,2,1,4,1,6,7,1,2])
                                                                                                                  print(rdd.takeSample(False, 5,1))
                                                                                                                  #结果
                                                                                                                  [6, 7, 1, 1, 4]
                                                                                                                  
                                                                                                                  takeOrdered

                                                                                                                  对RDD进行排序取前N个

                                                                                                                  API:

                                                                                                                  • rdd.takeordered(参数1,参数2)
                                                                                                                  • 参数1要几个数据
                                                                                                                  • 参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
                                                                                                                  • 这个方法使用按照元素自然顺序升序排序,如果你想玩倒叙,需要用参数2来对排序的数据进行处理
                                                                                                                    rdd = sc.parallelize([1, 2, 9, 2, 1, 5, 7, 3, 9])
                                                                                                                    # 默认是升序并选取前n个元素
                                                                                                                    print(rdd.takeOrdered(3))
                                                                                                                    # 增加匿名函数可以使其降序排序并选取前n个 == top
                                                                                                                    print(rdd.takeOrdered(3, lambda x: -x))
                                                                                                                    #结果
                                                                                                                    [1, 1, 2]
                                                                                                                    [9, 9, 7]
                                                                                                                    
                                                                                                                    foreach

                                                                                                                    对RDD的每一个元素,执行提供的逻辑操作(和map一样),但这个方法没有返回值

                                                                                                                    注意: 但是它执行的时候, 每个分区会抢占cpu资源,从而会导致数据乱序的情况

                                                                                                                    API:

                                                                                                                    • rdd.foreach(func)
                                                                                                                    • func(T)=>None
                                                                                                                      rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9])
                                                                                                                      result = rdd.foreach(lambda x: x * 10)
                                                                                                                      # foreach对元素进行处理后没有返回值
                                                                                                                      print(result)
                                                                                                                      # 这种方式的输出是由executor输出的而不是drive
                                                                                                                      result = rdd.foreach(lambda x: print(x * 10))
                                                                                                                      #结果
                                                                                                                      None
                                                                                                                      50
                                                                                                                      60
                                                                                                                      70
                                                                                                                      80
                                                                                                                      90
                                                                                                                      10
                                                                                                                      20
                                                                                                                      30
                                                                                                                      40
                                                                                                                      
                                                                                                                      saveAsTextFile

                                                                                                                      将RDD的数据写入文本文件,支持本地写出,hdfs等文件系统

                                                                                                                      • 注意:写出的时候,每个分区所在的Task直接控制数据写出到目标文件系统中,所以才会一个分区产生1个结果文件.
                                                                                                                        rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
                                                                                                                        # 将内容写到本地
                                                                                                                        # rdd.saveAsTextFile("../data/output/out2")
                                                                                                                        # 将内容保存到hdfs
                                                                                                                        rdd.saveAsTextFile("hdfs://node1:8020/input/output/out1")
                                                                                                                        # saveastextfile中是由executor来写入的
                                                                                                                        
                                                                                                                        • action算子中foreach和saveAsTextFile算子是分区直接执行的,其余的Action算子都回将结果发送至Driver
                                                                                                                          collectAsMap

                                                                                                                          概述: 将二元组的RDD转化为字典(dict). 将它转为rdd,就相当于将其转为本地,不再是弹性分布式数据集rdd了.

                                                                                                                           
                                                                                                                          
                                                                                                                          2.2.4–分区操作算子
                                                                                                                          mapPartition

                                                                                                                          Transformation算子 mapPartition—次被传递的是一整个分区的数据作为一个迭代器(一次性list)对象传入过来

                                                                                                                          rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
                                                                                                                          def process(iter):
                                                                                                                              result = list()
                                                                                                                              for it in iter:
                                                                                                                                  result.append(it*10)
                                                                                                                              return result
                                                                                                                          # mapParttion 是将一个分区的值按照list或者迭代器送到计算的地方
                                                                                                                          # 相比map 可以提高网络传送的性能
                                                                                                                          print(rdd.mapPartitions(process).collect())
                                                                                                                          #结果
                                                                                                                          [10, 20, 30, 40, 50, 60, 70, 80, 90]
                                                                                                                          
                                                                                                                          foreachPartition

                                                                                                                          Action算子 和普通的foreach一致,但是一次处理的是一整个分区数据

                                                                                                                          rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3)
                                                                                                                          def process(iter):
                                                                                                                              result = list()
                                                                                                                              for it in iter:
                                                                                                                                  result.append(it*10)
                                                                                                                              return print(result)
                                                                                                                          # 与mapPartition相同,但是不返回值
                                                                                                                          rdd.foreachPartition(process)
                                                                                                                          #结果
                                                                                                                          [10, 20, 30]
                                                                                                                          [40, 50, 60]
                                                                                                                          [70, 80, 90]
                                                                                                                          
                                                                                                                          • foreachPartition就是一个没有返回值的mapPartitions
                                                                                                                            partitionBy

                                                                                                                            Transformation算子 对RDD进行自定义分区

                                                                                                                            API:

                                                                                                                            • rdd.partitionBy(参数1,参数2)
                                                                                                                            • 参数1重新分区后有几个分区
                                                                                                                            • 参数2自定义分区规则,函数传入
                                                                                                                            • 参数2:(K)→ int 一个传入参数进来,类型无所谓,但是返回值一定是int类型将key传给这个函数,你自己写逻辑,决定返回一个分区编号
                                                                                                                            • 分区编号从0开始,不要超出分区数-1
                                                                                                                              rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('d',5)])
                                                                                                                              # 使用partitionby实现自定义分区
                                                                                                                              def process(key):
                                                                                                                                  if 'a' == key or 'b' ==key:
                                                                                                                                      return 0
                                                                                                                                  if 'c' == key:
                                                                                                                                      return 1
                                                                                                                                  return 2
                                                                                                                              print(rdd.partitionBy(3, process).glom().collect())
                                                                                                                              #结果
                                                                                                                              
                                                                                                                              repartition

                                                                                                                              Transformation算子 对RDD的分区执行重新分区(仅数量)

                                                                                                                              rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
                                                                                                                              # repartition修改分区
                                                                                                                              # 使用该API时,尽量降分区数,因为用该api会影响内存迭代管道
                                                                                                                              print(rdd.repartition(1).getNumPartitions())
                                                                                                                              print(rdd.repartition(5).getNumPartitions())
                                                                                                                              
                                                                                                                              coalesce

                                                                                                                              Transformation算子 对分区数量增减

                                                                                                                              API:

                                                                                                                              • rdd.coalesce(参数1,参数2)
                                                                                                                              • 参数1,分区数
                                                                                                                              • 参数2,True or False
                                                                                                                              • True表示允许shuffle,也就是可以加分区
                                                                                                                              • False表示不允许shuffle,也就是不能加分区,False是默认
                                                                                                                                # coalesce 修改分区 只有将shuffle改为true才能按照所给的数分区,
                                                                                                                                # 要不然会自动进行安全鉴定而进行分区
                                                                                                                                print(rdd.coalesce(1).getNumPartitions())
                                                                                                                                print(rdd.coalesce(5,shuffle=True).getNumPartitions())
                                                                                                                                

                                                                                                                                2.3–RDD缓存

                                                                                                                                2.3.1–RDD缓存的目的
                                                                                                                                • RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。
                                                                                                                                • RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
                                                                                                                                • 这个特性可以最大化的利用资源,老l日RDD没用了就从内存中清理,给后续的计算腾出内存空间。
                                                                                                                                • RDD的缓存技术: Spark提供了缓存APIl,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上缓存的API
                                                                                                                                  2.3.1–RDD缓存的特点
                                                                                                                                  • 缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上,但是,这个保存在设定上是认为不安全的。

                                                                                                                                  • 缓存的数据在设计上是认为有丢失风险的,所以缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系

                                                                                                                                    一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据

                                                                                                                                  • 缓存如何丢失:在内存中的缓存是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路

                                                                                                                                    硬盘中因为硬盘损坏也是可能丢失的。

                                                                                                                                  • RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上,这是分散存储

                                                                                                                                    2.3.2–RDD缓存的API

                                                                                                                                    from pyspark.storagelevel import StorageLevel

                                                                                                                                    rdd3.cache() 缓存到内存中

                                                                                                                                    rdd3. persist(StorageLevel.MEMORY_ONLY) 仅内存缓存

                                                                                                                                    rdd3. persist(StorageLevel.MEMORY_ONLY_2) 仅内存缓存,2个副本

                                                                                                                                    rdd3. persist(StorageLevel.DISK_ONLY) 仅缓存硬盘上

                                                                                                                                    rdd3.persist(StorageLevel.DISK_ONLY_2) 仅缓存硬盘上,2个副本

                                                                                                                                    rdd3.persist(storageLevel.DISK_ONLY_3) 仅缓存硬盘上,3个副本

                                                                                                                                    rdd3.persist(StorageLevel.MEMORY_AND_DISK) 先放内存,不够放硬盘

                                                                                                                                    rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)先放内存,不够放硬盘,2个副本

                                                                                                                                    rdd3.persist(StorageLevel.OFF_HEAP) 堆外内存(系统内存)

                                                                                                                                    如上API,自行选择使用即可

                                                                                                                                    一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)

                                                                                                                                    如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了用CheckPoint

                                                                                                                                    主动清理缓存的API rdd . unpersist()

                                                                                                                                    from pyspark import SparkContext, SparkConf
                                                                                                                                    from pyspark.storagelevel import StorageLevel
                                                                                                                                    if __name__ == '__main__':
                                                                                                                                        conf = SparkConf().setAppName("Text").setMaster("local[*]")
                                                                                                                                        sc = SparkContext(conf=conf)
                                                                                                                                        rdd1 = sc.textFile("../data/input/a.txt")
                                                                                                                                        rdd2 = rdd1.flatMap(lambda x:x.split(" "))
                                                                                                                                        rdd3 = rdd2.map(lambda x:(x,1))
                                                                                                                                        # 增加缓存
                                                                                                                                        rdd3.cache()
                                                                                                                                        # 自定义缓存
                                                                                                                                        rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)
                                                                                                                                        rdd4 = rdd3.reduceByKey(lambda a,b:a+b)
                                                                                                                                        print(rdd4.collect())
                                                                                                                                        rdd5 = rdd3.groupByKey()
                                                                                                                                        rdd6 = rdd5.mapValues(lambda x:sum(x))
                                                                                                                                        print(rdd6.collect())
                                                                                                                                        # 解除缓存
                                                                                                                                        rdd3.unpersist()
                                                                                                                                    
                                                                                                                                    2.3.3–RDD的CheckPoint

                                                                                                                                    CheckPoint技术,也是将RDD的数据,保存起来,但是它仅支持硬盘存储,并且,它被设计认为是安全的不保留血缘关系

                                                                                                                                    特点:

                                                                                                                                    • CheckPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储
                                                                                                                                    • CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
                                                                                                                                    • CheckPoint支持写入HDFS,缓存不行, HDFS是高可靠存储, CheckPoint被认为是安全的.
                                                                                                                                    • CheckPoint不支持内存,缓存可以,缓存如果写内存性能比CheckPoint要好一些
                                                                                                                                    • CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留

                                                                                                                                      使用:

                                                                                                                                      CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适,或者数据量很大,用CheckPoint比较合适.

                                                                                                                                      如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可.

                                                                                                                                      Cache和CheckPoint两个API都不是Action类型,所以想要它俩工作,必须在后面接上Action,接上Action的目的,是让RDD有数据,而不是为了让checkPoint和Cache工作。

                                                                                                                                      from pyspark import SparkContext, SparkConf
                                                                                                                                      from pyspark.storagelevel import StorageLevel
                                                                                                                                      if __name__ == '__main__':
                                                                                                                                          conf = SparkConf().setAppName("Text").setMaster("local[*]")
                                                                                                                                          sc = SparkContext(conf=conf)
                                                                                                                                          # 1、告知spark开启checkpoint功能
                                                                                                                                          sc.setCheckpointDir("hdfs://node1:8020/input/output/ckp")
                                                                                                                                          rdd1 = sc.textFile("../data/input/a.txt")
                                                                                                                                          rdd2 = rdd1.flatMap(lambda x:x.split(" "))
                                                                                                                                          rdd3 = rdd2.map(lambda x:(x,1))
                                                                                                                                          # 2、调用checkpoint的api 保存数据即可
                                                                                                                                          rdd3.checkpoint()
                                                                                                                                          rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
                                                                                                                                          print(rdd4.collect())
                                                                                                                                          rdd5 = rdd3.groupByKey()
                                                                                                                                          rdd6 = rdd5.mapValues(lambda x: sum(x))
                                                                                                                                          print(rdd6.collect())
                                                                                                                                          # 解除缓存
                                                                                                                                          rdd3.unpersist()
                                                                                                                                      

                                                                                                                                      2.4–广播变量

                                                                                                                                      2.4.1–概念

                                                                                                                                      本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。

                                                                                                                                      executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。

                                                                                                                                      如果将本地list对象标记为广播变量对象,那么当上述场景出现的时候,Spark只会,给每个Executor来一份数据,而不是像原本那样,每一个分区的处理线程都来一份,节省内存.

                                                                                                                                      2.4.2–API

                                                                                                                                      1.将本地list标记成广播变量是即可

                                                                                                                                      broadcast = sc. broadcast(stu_info_list)

                                                                                                                                      2.使用广播变量,从broadcast对象中取出本地list对象即可value = broadcast.value

                                                                                                                                      • 也就是先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
                                                                                                                                      • 只要中间传翰的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区要都给.
                                                                                                                                        from pyspark import SparkContext,SparkConf
                                                                                                                                        if __name__ == '__main__':
                                                                                                                                            conf = SparkConf().setAppName("text").setMaster("local[*]")
                                                                                                                                            sc = SparkContext(conf=conf)
                                                                                                                                            stu_info_list = {
                                                                                                                                                (1,'zxz',11),
                                                                                                                                                (2,'wrx',13),
                                                                                                                                                (3,'小明',11),
                                                                                                                                                (4,'王大力',11)
                                                                                                                                            }
                                                                                                                                            # 1.将本地python list对象标记为广播变量
                                                                                                                                            broadcast = sc.broadcast(stu_info_list)
                                                                                                                                            score_info_rdd = sc.parallelize([
                                                                                                                                                (1,'语文',99),
                                                                                                                                                (2,'数学',99),
                                                                                                                                                (3,'英语',99),
                                                                                                                                                (4,'数学',99),
                                                                                                                                                (2,'编程',99),
                                                                                                                                                (4,'语文',99),
                                                                                                                                                (1,'数学',99)
                                                                                                                                            ])
                                                                                                                                            def map_func(data):
                                                                                                                                                id = data[0]
                                                                                                                                                name = ''
                                                                                                                                                # 匹配分布list和rdd分布式的id
                                                                                                                                                # 2.在使用到本地集合对象的地方,从广播变量中取出来用即可
                                                                                                                                                for stu_info in broadcast.value:
                                                                                                                                                    stu_id = stu_info[0]
                                                                                                                                                    if id == stu_id:
                                                                                                                                                        name = stu_info[1]
                                                                                                                                                return (name,data[1],data[2])
                                                                                                                                            print(score_info_rdd.map(map_func).collect())
                                                                                                                                            """
                                                                                                                                            场景:本地集合对象和分布式对象(RDD)进行关联的时候
                                                                                                                                            需要将本地集合封装为广播变量
                                                                                                                                            可以节省:
                                                                                                                                            1.网络io的次数
                                                                                                                                            2.Executor的内存占用
                                                                                                                                            """
                                                                                                                                            #结果
                                                                                                                                            
                                                                                                                                        

                                                                                                                                        2.5–累加器

                                                                                                                                        2.5.1–需求

                                                                                                                                        想要对map算子计算中的数据,进行计数累加,得到全部数据计算完后的累加结果

                                                                                                                                        2.5.2–没有累加器的代码演示
                                                                                                                                        from pyspark import SparkContext,SparkConf
                                                                                                                                        if __name__ == '__main__':
                                                                                                                                            conf = SparkConf().setAppName("text").setMaster("local[*]")
                                                                                                                                            sc = SparkContext(conf=conf)
                                                                                                                                            rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
                                                                                                                                            count = 0
                                                                                                                                            def map_func(data):
                                                                                                                                                global count
                                                                                                                                                count += 1
                                                                                                                                                print(count)
                                                                                                                                            rdd.map(map_func).collect()
                                                                                                                                            print(count)
                                                                                                                                            # 可以看出count最外面的值没有变化
                                                                                                                                             #结果
                                                                                                                                             0
                                                                                                                                            
                                                                                                                                        
                                                                                                                                        2.5.3–增加累加器的代码

                                                                                                                                        原因:

                                                                                                                                        count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送),每个executor各自收到一个,在最后执行print(count)的时候,这个被打印的count依旧是driver的那个,所以不管executor中累加到多少,都和driver这个count无关。

                                                                                                                                        API:

                                                                                                                                        • sc.accumulator(初始值)
                                                                                                                                        • 这个对象唯一和前面提到的count不同的是
                                                                                                                                        • 这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上
                                                                                                                                          from pyspark import SparkContext,SparkConf
                                                                                                                                          if __name__ == '__main__':
                                                                                                                                              conf = SparkConf().setAppName("text").setMaster("local[*]")
                                                                                                                                              sc = SparkContext(conf=conf)
                                                                                                                                              rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
                                                                                                                                              # spark提供的累加器变量,参数是初始值
                                                                                                                                              scmlt = sc.accumulator(0)
                                                                                                                                              def map_func(data):
                                                                                                                                                  global scmlt
                                                                                                                                                  scmlt += 1
                                                                                                                                                  # print(scmlt)
                                                                                                                                              rdd2 = rdd.map(map_func)
                                                                                                                                              rdd2.collect()
                                                                                                                                              rdd3 = rdd2.map(lambda x:x)
                                                                                                                                              rdd3.collect()
                                                                                                                                              print(scmlt)
                                                                                                                                              #结果
                                                                                                                                              20
                                                                                                                                              
                                                                                                                                          

                                                                                                                                          如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了),当rdd3构建出来的时候,是依赖rdd2的, rdd2没数据,那么rdd2就要重新生成.重新生成就导致累加器累加数据的代码再次被执行,所以代码的结果是20。

                                                                                                                                          注意事项:

                                                                                                                                          也就是,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用可能会重新构建此rdd如果累加器累加代码,存在重新构建的步骤中,累加器累加代码就可能被多次执行。

                                                                                                                                          如何解决:加缓存或者checkPoint即可.

                                                                                                                                          2.6–python程序对hdfs文件操作

                                                                                                                                          from hdfs.client import Client
                                                                                                                                          client = Client('http://node1:9870')
                                                                                                                                          # 创建文件夹
                                                                                                                                          client.makedirs('/datas/aaa')
                                                                                                                                          # 删除文件夹
                                                                                                                                          client.delete('/datas/output')
                                                                                                                                          # 上传文件
                                                                                                                                          client.upload('/datas/aaa/a.txt','E:/aa/a.txt')
                                                                                                                                          # 下载文件
                                                                                                                                          client.download('/datas/a.txt','E:/a.txt')
                                                                                                                                          

                                                                                                                                          2–SparkSQL

                                                                                                                                          3.1–DataFrame

                                                                                                                                          3.1.1–SparkSession对象环境的创建
                                                                                                                                          • SparkSession对象
                                                                                                                                            # Sparksession对象的导包
                                                                                                                                            from pyspark.sql import SparkSession
                                                                                                                                            if __name__ == '__main__':
                                                                                                                                                # 构建session环境入口
                                                                                                                                                spark = SparkSession.builder.appName("test").master('local[*]') \
                                                                                                                                                    .getOrCreate()
                                                                                                                                            
                                                                                                                                            • 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
                                                                                                                                              # 通过SparkSession对象获取SparkContext对象
                                                                                                                                              sc = spark.sparkContext
                                                                                                                                              
                                                                                                                                              • 用于SparkSQL编程作为入口对象
                                                                                                                                                # SparkSql的helloworld
                                                                                                                                                df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False)
                                                                                                                                                df2 = df.toDF("id", "name", "score")
                                                                                                                                                df2.printSchema()
                                                                                                                                                df2.show()
                                                                                                                                                df2.createTempView("score")
                                                                                                                                                # SQL风格
                                                                                                                                                spark.sql("""
                                                                                                                                                    SELECT * FROM score WHERE name='语文' LIMIT 5   
                                                                                                                                                """).show()
                                                                                                                                                
                                                                                                                                                • 所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象
                                                                                                                                                  3.1.2–DataFrame的组成

                                                                                                                                                  DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:

                                                                                                                                                  • 表结构描述

                                                                                                                                                    比如,在MySQL中的一张表:

                                                                                                                                                    • 由许多行组成
                                                                                                                                                    • 数据也被分成多个列
                                                                                                                                                    • 表也有表结构信息(列、列名、列类型、列约束等)

                                                                                                                                                      基于这个前提,DataFrame的组成如下:

                                                                                                                                                      在结构层面:

                                                                                                                                                      • StructType对象描述整个DataFrame的表结构
                                                                                                                                                      • StructField对象描述一个列的信息

                                                                                                                                                        在数据层面:

                                                                                                                                                        • Row对象记录一行数据
                                                                                                                                                        • Column对象记录一列数据并包含列的信息

                                                                                                                                                          一个StructField记录:列名、列类型、列是否运行为空

                                                                                                                                                          多个StructField组成一个StructType对象。

                                                                                                                                                          一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空

                                                                                                                                                          同时,一行数据描述为Row对象,如Row(1, 张三, 11)

                                                                                                                                                          一列数据描述为Column对象,Column对象包含一列数据和列的信息

                                                                                                                                                          3.1.3–DataFrame代码构建
                                                                                                                                                          基于RDD方式1

                                                                                                                                                          DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构

                                                                                                                                                          API:

                                                                                                                                                          • spark.createDataFrame(data_rdd,schema=[‘name’,‘age’])
                                                                                                                                                          • 参数1 被转换的rdd
                                                                                                                                                          • 参数2 通过list的形式按照顺序依次提供字符串名称即可
                                                                                                                                                            from pyspark.sql import SparkSession
                                                                                                                                                            if __name__ == '__main__':
                                                                                                                                                                # 构建执行环境入口对象SparkSession
                                                                                                                                                                spark = SparkSession.builder.appName('test').master('local[*]').\
                                                                                                                                                                    getOrCreate()
                                                                                                                                                                sc = spark.sparkContext
                                                                                                                                                                # 基于rdd转换成Dataframe类型
                                                                                                                                                                file_rdd = sc.textFile('../data/input/sql/people.txt')
                                                                                                                                                                data_rdd = file_rdd.map(lambda x:x.split(', ')).map(lambda x:(x[0],int(x[1])))
                                                                                                                                                                # 构建Dataframe对象
                                                                                                                                                                # 参数1 被转换的rdd
                                                                                                                                                                # 参数2 通过list的形式按照顺序依次提供字符串名称即可
                                                                                                                                                                df = spark.createDataFrame(data_rdd,schema=['name','age'])
                                                                                                                                                                # 打印DataFrame的表结构
                                                                                                                                                                df.printSchema()
                                                                                                                                                                # 打印df的数据
                                                                                                                                                                # 参数1:展示多少条数据,默认不传的话是20
                                                                                                                                                                # 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度,
                                                                                                                                                                # 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是True
                                                                                                                                                                df.show(20,False)
                                                                                                                                                                # 将df对象转换成临时视图表,可供sql语句查询
                                                                                                                                                                df.createOrReplaceTempView("people")
                                                                                                                                                                spark.sql("""
                                                                                                                                                                    SELECT * FROM people WHERE age
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]