spark中pyspark :add

07-02 568阅读

一、RDD的介绍(了解)

RDD:resilient distributed dataset(弹性分布式数据集合 ) spark的计算核心,spark采用rdd管理数据

spark中pyspark :add
(图片来源网络,侵删)
  • RDD

    • RDD是spark的一种数据模型(规定数据的存储结构和计算方法)

    • python中的数据模型

      • list [] 可以重复存储数据 append

      • set{} 不允许重复存储

      • dict {k:v} get(key)

    • RDD的模型可以对内存数进行共享管理

  • 分布式

    • 数据可以在多台服务器上同时计算执行

  • 弹性

    • 可以根据计算的需求将数据进行分区拆分,本质就是将数据分成多份

    二、RDD的特点(了解)

    • 分区

      • 可以将计算的海量数据分成多份,需要分成多少可分区可以通过方法指定

      • 每个分区都可以对应一个task线程执行计算

    • 只读

      • rdd中的数据不能直接修改,需要通过方法计算后得到一个新的rdd

      • rdd本身存储的数只能读取

    • 依赖

      • rdd之间是有依赖关系的

      • 新的rdd是通过旧的rdd计算得到

    • 缓存

      • 可以将计算的中结果缓存起来,如果后续计算错误时,可以从缓存位置重新计算

      • 将数据存储在内存或本地磁盘

      • 作用是容错

      • 缓存在执行计算任务程序结束后会释放删除

    • checkpoint

      • 作用和缓存一样

      • checkpoint可以将数据存储在分布式存储系统中,比如hdfs

      三、创建RDD数据(掌握)

      将需要计算的数据转为rdd的数据,就可以利用spark的内存计算方法进行分布式计算操作,这些计算方法就是有rdd提供的

      rdd数据的转化方法是有sparkcontext提供的,所以需要先生成sparkcontext,sparkcontext中还包含资源申请和任务划分功能

      SparkContext称为Spark的入口类

      3-1 Python数据转化为rdd

      # 导入sparkcontext
      from pyspark import SparkContext
      ​
      # 创建SparkContext对象
      sc = SparkContext()
      ​
      # 将Python数据转为rdd
      # data_int = 10  # 数值类型不能转化rdd
      # 能for循环遍历的数据都能转为rdd
      data_str = 'abc'
      data_list = [1, 2, 3, 4]
      data_dict = {'a': 1, 'b': 2}
      data_set = {1, 2, 3, 4}
      data_tuple = (1, 2, 3, 4)
      rdd = sc.parallelize(data_tuple)
      ​
      # rdd的计算
      ​
      ​
      # rdd的数据输出展示
      # 获取所有rdd数据
      res = rdd.collect()
      print(res)

      3-2 文件数据(hdfs)转化为rdd

      # 将读取的hdfs文件数据转为rdd
      from pyspark import SparkContext
      ​
      # 生成SparkContext类对象
      sc = SparkContext()
      ​
      # 读取文件数据转为rdd
      rdd1  = sc.textFile('hdfs://node1:8020/data')
      rdd2  = sc.textFile('/data/words.txt')
      ​
      # 查看数据
      res = rdd1.collect()
      print(res)
      res = rdd2.collect()
      print(res)
      ​

      3-3 rdd的分区

      • python数据转发的分区数指定

        # RDD分区使用
        # 导入sparkcontext
        from pyspark import SparkContext
        ​
        # 创建SparkContext对象
        sc = SparkContext()
        ​
        # 创建生成rdd是可以指定分区数
        # Python数据转为rdd指定
        # numSlices 可以指定分区数
        rdd_py = sc.parallelize([1,2,3,4,5,6],numSlices=10)
        ​
        ​
        # rdd计算
        ​
        # 查看rdd分区数据
        res1  = rdd_py.glom().collect()
        print(res1)
        ​
        ​
        • 读取的文件数据进行分区数指定

          # RDD分区使用
          # 导入sparkcontext
          from pyspark import SparkContext
          ​
          # 创建SparkContext对象
          sc = SparkContext()
          ​
          # 创建生成rdd是可以指定分区数
          # file文件读取数据指定分区数据
          # minPartitions 指定分区
          # 文件大小/分区数  = 值 -----余数
          # 余数/值 * 100%=百分比    百分比大于10% 会多创建一个分区
          rdd_file = sc.textFile('hdfs://node1:8020/data',minPartitions=1)
          # 在spark并行度部分会讲解如何根据资源设置分区数
          ​
          # rdd计算
          ​
          # 查看rdd分区数据
          ​
          res2  = rdd_file.glom().collect()
          print(res2)
          ​

          3-4 小文件数据读取

          300M 3个块 对应三个分区

          在一个目录下,有多个文件,如果文件的大小不够一个块的大小,一个文件就对应一个分区,文件超过一个块,那就一个block(128M)块对应一个分区。

          目录下都是小文件,那么读取目录下的文件数据,会对应很多个分区


          一个分区对应一个task线程,当小文件过多时,会占用大量的线程,造成资源浪费

          使用wholeTextFiles方法可以解决

          该方法会现将读取到的数据合并在一起,然后重新进行分区

          # 导入sparkcontext
          from pyspark import SparkContext
          ​
          # 创建SparkContext对象
          sc = SparkContext(master='yarn')
          # rdd = sc.textFile('hdfs://node1:8020/data')
          # rdd计算
          # wholeTextFiles 会合并小文件数据
          # minPartitions 指定分区数
          rdd_mini = sc.wholeTextFiles('hdfs://node1:8020/data',minPartitions=1)
          ​
          # 展示数据
          # res1 = rdd.glom().collect()
          # print(res1)
          ​
          res2 = rdd_mini.glom().collect()
          print(res2)

          java.lang.NoSuchMethodError

          java包类的冲突

          node1操作 同步到node2和node3

          四、常用RDD算子(掌握)

          将数据转化为rdd之后,就需要进行rdd的计算了,rdd提供了计算方法

          rdd的方法又称为rdd算子

          4-1 算子(方法)介绍

          rdd中封装了各种算子方便进行计算,主要分为两类

          • transformation

            • 转化算子 对rdd数据进行转化计算得到新的rdd ,定义了一个线程任务

          • action

            • 执行算子 触发计算任务,让计算任务进行执行,得到结果

            • 触发线程执行的

            rdd的转化算子大部分都是从rdd中读取元素数据(rdd中每条数据),具体计算需要开发人员编写函数传递到rdd算子中

            rdd的执行算子则大部分是用来获取数据 collect方法就是触发算子

            4-2 常用transformation算子(掌握)

            • map

              • rdd.map(lambda 参数:参数计算)

              • 参数接受每个元素数据

              # 转化算子map的使用
              from pyspark import SparkContext
              ​
              # 创建SparkContext对象
              sc = SparkContext()
              ​
              # 生成rdd
              data = [1, 2, 3, 4]
              rdd = sc.parallelize(data)
              ​
              ​
              # 对rdd进行计算
              # 转化算子map使用
              # 将处理数据函数当成参数传递给map
              # 定义函数只需要一个接受参数
              def func(x):
                  """
                      数据计算逻辑函数
                  :param x: 接收每一个rdd的元素数据
                  :return:
                  """
                  return x + 1
              ​
              ​
              def func2(x):
                  """
                      数据计算逻辑函数
                  :param x: 接收每一个rdd的元素数据
                  :return:
                  """
                  return str(x)
              ​
              ​
              # 转化算子执行后会返回新的rdd
              rdd_map = rdd.map(func)
              rdd_map2 = rdd.map(func2)
              rdd_map3 = rdd_map2.map(lambda x: [x])
              ​
              # 对rdd数据结果展示
              # 使用rdd的触发算子,collect获取是所有的rdd元素数据
              res = rdd_map.collect()
              print(res)
              ​
              res2 = rdd_map2.collect()
              print(res2)
              ​
              ​
              res3 = rdd_map3.collect()
              print(res3)
              ​
              • flatMap

                • 处理的是二维嵌套列表数据 [[1,2,3],[4,5,6],[7,8,9]] [1,2,3,4]

                • rdd.flatMap(lambda 参数:[参数计算])

                from pyspark import SparkContext
                ​
                # 创建SparkContext对象
                sc = SparkContext()
                ​
                # 生成rdd
                data = [[1, 2], [3, 4]]
                data2 = ['a,b,c','d,f,g']  # 将数据转为['a','b','c','d','f','g']
                rdd = sc.parallelize(data)
                rdd2 = sc.parallelize(data2)
                ​
                # rdd计算
                # flatMap算子使用  将rdd元素中的列表数依次遍历取出对应的值放入新的rdd [1,2,3,4]
                # 传递一个函数,函数接受一个参数
                rdd_flatMap = rdd.flatMap(lambda x: x)
                ​
                rdd_map = rdd2.map(lambda x:x.split(','))
                rdd_flatMap2 = rdd_map.flatMap(lambda x:x)
                ​
                # 输出展示数据
                # 使用执行算子
                res  = rdd_flatMap.collect()
                print(res)
                ​
                res2  = rdd_map.collect()
                print(res2)
                ​
                res3 = rdd_flatMap2.collect()
                print(res3)
                • fliter

                  • rdd.filter(lambda 参数:参数条件过滤)

                  • 条件过滤的书写和Python中if判断一样

                  # 3、过滤算子
                  rdd7 = sc.parallelize([1, 2, 3, 4])
                  rdd8 = sc.parallelize(['a', 'b', 'c', 'a'])
                  # filter算子,可以接受rdd中每个元素数据,然后传递给函数进行过滤
                  # lambda需要有一个接收值x,x接收到每个元素数据后,如何进行过滤需要写判断逻辑
                  ​
                  # 判断条件的书写逻辑和if的判断逻辑一样
                  filter_rdd = rdd7.filter(lambda x: x > 2)
                  filter_rdd2 = rdd8.filter(lambda x: x == 'a')
                  • distinct 去重

                    • 不需要lambda rdd.distinct

                    # 4、去重
                    # distinct 会对rdd中的重复数据进行去重,去重后会返回一个新的rdd
                    distinct_rdd = rdd8.distinct()
                    
                    • groupBy 分组

                      • rdd.groupBy(lambda 参数:根据参数编写分组条件)

                      • mapValues(list)

                      # 5、对数据进行分组
                      # groupBy是分组算子,会读取rdd中每个元素数据,传递给函数使用
                      # lambda需要一个接收值x,接收groupBy传递的元素数据,然后指定分组规则
                      # hash(x) % 2  对x中的元素数据进行hash取余,将数据分成两组,余数相同的数据会放在一起
                      # groupBy返回一个新的rdd,rdd的结构形式是  [(key,value),(k,v)]
                      groupBy_rdd = rdd8.groupBy(lambda x: hash(x) % 2)
                      # 6、对kv形式的数据进行取值处理
                      # mapValues,可以获取kv中的value值部分传递给函数进行使用
                      # mapValues返回一个新的rdd数据
                      mapValues_rdd = groupBy_rdd.mapValues(lambda x: list(x))
                      
                      • k-v数据 [(k,v),(k1,v1)]

                        • groupByKey()

                          • rdd.groupByKey()

                        • reduceByKey()

                          • rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)

                        • sortByKey()

                          • rdd.sortByKey()

                        # 6-2 对kv形式的数据进行分组,系统key值得数据会放在一起
                        rdd9 = sc.parallelize([('a', 1), ('b', 2), ('c', 3), ('a', 1), ('b', 2)])
                        # 不需要传递处理函数,返回一个新的rdd
                        groupByKey_rdd = rdd9.groupByKey()
                        mapValues_rdd2 = groupByKey_rdd.mapValues(lambda x: list(x))
                        # 6-3 对kv形式的数据先进行分组,在进行聚合计算
                        # reduceByKey会将相同key的数据放在一起,然后对每个key中对应的valeu进行累加计算
                        # reduceByKey会将分组后的数据,按照key值传递个函数进行计算
                        # lambda需要接受两个参数,后面编写累加计算  x=0  y=3  x+y=3    x=3,y=0  res=x+y=3
                        reduceByKey_rdd = rdd9.reduceByKey(lambda x, y: x + y)
                        # 6-4 对kv形式的数据先进行排序
                        # 不需要指定函数,按照key排序,默认升序
                        sortByKey_rdd= rdd9.sortByKey()
                        sortByKey_rdd2= rdd9.sortByKey(ascending=False)
                        
                        • sortBy() 排序

                          • rdd.sortBy(lambda x:x,ascending=False)

                          # 排序算子
                          # sortBy 可以指定按照哪个数据进行排序
                          # sortBy会将rdd中的元素数据传递给函数使用
                          # lambda 需要一个接受值x,接受rdd中每个元素
                          # 如果元素是kv类型可以通过下标方式指定按照那种排序 x[0] 代表key x[1] 代表value值
                          # 默认升序
                          sortBy_rdd = rdd9.sortBy(lambda x: x[1])
                          # 降序
                          sortBy_rdd2 = rdd9.sortBy(lambda x: x[1],ascending=False)
                          

                          4-3 常用action算子(掌握)

                          • collect() 取出rdd中所有值

                            • rdd.collect()

                          • reduce() 非k-v类型数据累加 [1,2,3,4,6]

                            • rdd.reduce(lambda 参数1,参数2:两个参数计算)

                          • count() 统计rdd元素个数

                            • rdd.count()

                          • take() 取出指定数量值

                            • rdd.take(数量)

                            # 执行算子的使用
                            from pyspark import SparkContext
                            sc = SparkContext()
                            # python转为rdd
                            rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
                            # transformation算的转化计算
                            # map,flatMap,fliter等
                            # 触发计算
                            # action算子计算完成返回的是计算结果,不在是rdd了,不能在进行rdd操作了
                            # collect方法,触发计算获取是所有计算结果
                            res = rdd.collect()
                            print(res)
                            # reduce方法,传递一个计算逻辑,对元素数据进行累加计算
                            # 可以不需要转化算直接累加计算,但是不能处理kv形式数据
                            res  = rdd.reduce(lambda x,y:x+y)
                            # x初始为0 y一次获取元素数据   x=0,y=1 x= x+y=1
                            # x=1,y=2  x+y=3
                            # x=3,y=3  x+y=6
                            print(res)
                            # count 获取rdd元素个数
                            res = rdd.count()
                            print(res)
                            # take取指定数量的元素数据
                            res=rdd.take(3)
                            print(res)
                            

                            4-4 词频统计案例(掌握)

                            # 词频统计
                            # 导入sparkcontext
                            from pyspark import SparkContext
                            # 创建SparkContext对象
                            sc = SparkContext()
                            # 将hdfs的文件数据读取后转为rdd
                            # 第一个参数 指定读取的文件路径
                            # rdd = sc.textFile('hdfs://node1:8020/data')
                            # 简写  有的会读取错误,当错误是就写完整
                            # rdd = sc.textFile('/data')
                            # 读取某单独文件
                            rdd = sc.textFile('hdfs://node1:8020/data/words.txt')
                            # rdd计算
                            # 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据
                            # rdd_map = rdd.map(lambda x: x.split(','))
                            rdd_flatMap= rdd.flatMap(lambda x: x.split(','))
                            # 将单词数据转化为k-v结构数据   [(k,v),(k1,v1)]   给每个单词的value一个初始值1
                            rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))
                            # 对kv数据进行聚合计算  hive:[1,1]  求和  求平均数  求最大值  求最小值
                            rdd_reduceByKey =  rdd_map_kv.reduceByKey(lambda x,y:x+y)  # 现将相同key值的数据放在一起,然后对相同key值内的进行累加
                            # 展示数据
                            res = rdd.collect()
                            print(res)
                            # res2 = rdd_map.collect()
                            # print(res2)
                            res3 = rdd_flatMap.collect()
                            print(res3)
                            res4 = rdd_map_kv.collect()
                            print(res4)
                            res5 = rdd_reduceByKey.collect()
                            print(res5)
                            # [('hadoop',1),('flink',1),('spark',2),('hive',2)]
                            

                            4-5 其他高级算子

                            • 多个rdd的方法

                              • union 合并两个rdd 不去重

                              • join k-v类型数据 通过key进行关联

                              # 多个rdd操作
                              from pyspark import SparkContext
                              sc = SparkContext()
                              rdd1 = sc.parallelize([1,2,3,4])
                              rdd2 = sc.parallelize([5,6,7,4])
                              rdd_kv1 = sc.parallelize([('a',1),('b',2),('c',3)])
                              rdd_kv2 = sc.parallelize([('c',4),('d',5),('e',6)])
                              # rdd之间的合并
                              # rdd1和并rdd2,合并后会返回先的rdd
                              union_rdd = rdd1.union(rdd2)
                              # rdd3和并rdd4,合并后会返回先的rdd
                              union_kv_rdd = rdd_kv1.union(rdd_kv2)
                              # kv形式rdd进行join关联  通过key关联
                              # 内关联  相同key的数据会保留下来
                              join_rdd = rdd_kv1.join(rdd_kv2)
                              # 左关联   左边rdd的数据会被保留下来,如果右边rdd有对应的key值数据会显示,没有对应key值会显示为空
                              leftOuterJoin_rdd = rdd_kv1.leftOuterJoin(rdd_kv2)
                              # 右关联   右边rdd的数据会被保留下来,如果左边rdd有对应的key值数据会显示,没有对应key值会显示为空
                              rightOuterJoin_rdd = rdd_kv1.rightOuterJoin(rdd_kv2)
                              # 查看结果
                              res = union_rdd.collect()
                              print(f'union合并结果:{res}')
                              res2 = union_kv_rdd.collect()
                              print(f'kv_union合并结果:{res2}')
                              res3 = join_rdd.collect()
                              print(f'join内关联结果:{res3}')
                              res4 = leftOuterJoin_rdd.collect()
                              print(f'左关联结果:{res4}')
                              res5 = rightOuterJoin_rdd.collect()
                              print(f'右关联结果:{res5}')
                              
                              • 重分区

                                # 1、导入sparkcontext类
                                from pyspark import SparkContext
                                # 2、初始化SparkContext类型
                                # 没有指定master参数,默认使用本机资源
                                sc = SparkContext()
                                # 3、将数据转为rdd数据
                                # 转化Python数据
                                data_list = [1, 2, 3, 4, 5, 6]
                                rdd = sc.parallelize(data_list,numSlices=10)
                                map_rdd = rdd.map(lambda x:x+1)
                                # 修改rdd分区信息
                                # 指定修改的分区数  返回得到新的rdd
                                # repartition 在进行使用时更多进行的是减少分区数
                                repartition_rdd = map_rdd.repartition(4)
                                repartition_rdd2 = map_rdd.repartition(3)
                                # 查看结果信息
                                # glom() 查看当前rdd的分区信息
                                res = map_rdd.glom().collect()
                                print(res)
                                res2 = repartition_rdd.glom().collect()
                                print(res2)
                                res3 = repartition_rdd2.glom().collect()
                                print(res3)
                                
                                • 数据保存

                                  # 1、导入sparkcontext类
                                  from pyspark import SparkContext
                                  # 2、初始化SparkContext类型
                                  # 没有指定master参数,默认使用本机资源
                                  sc = SparkContext()
                                  # 3、将数据转为rdd数据
                                  # 转化Python数据
                                  data_list = [1, 2, 3, 4, 5, 6]
                                  rdd = sc.parallelize(data_list,numSlices=3)
                                  rdd.saveAsTextFile('/itcast111')
                                  
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]