Spark离线开发指南(详细版)
文章目录
- 1--Spark—core
- **2.1--RDD的创建**
- 2.1.1--并行化创建
- 2.1.2--获取分区数
- 2.1.3--读取文件创建RDD
- textFile
- wholeTextFile
- 2.2--RDD算子
- 2.2.1--算子概念
- 2.2.2--Transformation算子
- map
- flatMap
- reduceByKey
- mapValues
- groupBy
- filter
- distinct
- union
- join
- intersection
- glom
- groupByKey
- sortBy
- sortByKey
- keys
- values
- 2.2.3--Action算子
- countByKey
- collect
- reduce
- fold
- aggregate
- first
- take
- top
- count
- takeSample
- takeOrdered
- foreach
- saveAsTextFile
- collectAsMap
- 2.2.4--分区操作算子
- mapPartition
- foreachPartition
- partitionBy
- repartition
- coalesce
- 2.3--RDD缓存
- 2.3.1--RDD缓存的目的
- 2.3.1--RDD缓存的特点
- 2.3.2--RDD缓存的API
- 2.3.3--RDD的CheckPoint
- 2.4--广播变量
- 2.4.1--概念
- 2.4.2--API
- 2.5--累加器
- 2.5.1--需求
- 2.5.2--没有累加器的代码演示
- 2.5.3--增加累加器的代码
- 2.6--python程序对hdfs文件操作
- 2--SparkSQL
- 3.1--DataFrame
- 3.1.1--SparkSession对象环境的创建
- 3.1.2--DataFrame的组成
- 3.1.3--DataFrame代码构建
- 基于RDD方式1
- 基于RDD方式2
- 基于RDD方式3
- 基于Pandas的DataFrame
- 读取外部数据
- 读取mysql数据
- 读取hive数据
- 读取es数据
- 3.1.4--DataFrame DSL风格演示
- 3.1.5--DataFrame SQL风格演示
- 3.1.6--数据清理
- 数据去重
- 缺失值处理
- 3.1.7--数据写出
- 写出到文件
- 写出到mysql
- 写出到hive
- 写出到es
- 3.2--DataFrameAPI
- printSchema
- show
- createTempView
- createOrReplaceTempView
- createGlobalTempView
- select
- join
- filter
- where
- groupBy
- orderBy
- withColumn
- withColumnRenamed
- first
- limit
- dropDuplicates
- dropna
- fillna
- write
- 3.3 类似于RDD的API
- count
- collect
- take
- first
- head
- tail
- foreach
- foreachPartition
- distinct
- union/unionAll
- coalesce/repartition
- cache/persist
- unpersist
- columns
- schema
- rdd
- printSchema
- 3.4--GroupDataAPI
- count
- avg
- sum
- min
- max
- round
- agg
- 3.5--column的常用API
- alias
- astype
- between
- cast
- astype
- contains
- endswith(other)
- eqNullSafe(other)
- lit
- 3.6--UDF
- 3.6.1--UDF的定义
- 3.6.2--UDF的返回值
- 3.6.3--UDAF by RDD
- 3.7--窗口函数
- DSL风格
- SQL风格
- 3--spark优化
1–Spark—core
2.1–RDD的创建
2.1.1–并行化创建
概念:并行化创建是指:将本地集合=>分布式RDD,这一步就是分布式的开端:本地转分布式
(图片来源网络,侵删)API:
- parallelize(参数1,参数2)
- 参数1 集合对象即可 比如list
- 参数2 分区数,不写默认是电脑的线程数
# 导入spark相关的包 from pyspark import SparkConf,SparkContext if __name__ == '__main__': # 初始化sparkcontext的对象 conf = SparkConf().setAppName("text").setMaster("local[*]") sc = SparkContext(conf=conf) # 通过并行化的方式创建RDD rdd= sc.parallelize([1,2,3,4,5,6,7,8,9]) # parallelize没有给分区数 默认是电脑cpu的线程 print("默认分区",rdd.getNumPartitions()) rdd_par = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3) print("分区数",rdd_par.getNumPartitions()) # 打印rdd的内容 # collect方法是将rdd中的每个分区的数据都发送到Driver, # 形成一个python的list的对象 分布式对象==>本地集合 print("内容",rdd_par.collect())
2.1.2–获取分区数
getNumPartitions() 获取RDD的分区数量 返回值是Int数字
API:
- rdd.getNumPartitions()
2.1.3–读取文件创建RDD
textFile
textFile():可以读取本地数据,也可以读取hdfs数据
API:
-
sparkcontext.textFile(参数1,参数2)
-
参数1:必填,文件路径支持本地,支持HDFS,也支持一些比如S3协议
-
参数2:可选,表示最小分区数量
-
注意:参数2话语权不足,spark有自己的判断,在它的允许的范围内,参数2才有效果,超出spark允许的范围,参数2就失效
读取本地文件:
file_rdd1 = sc.textFile("../data/input/a.txt") print("默认读取分区数:",file_rdd1.getNumPartitions()) print("内容",file_rdd1.collect())
参数2的用法:
# 加最小分区数测试 最小分区数只是个参考值,spark有自己的判断 file_rdd2 = sc.textFile("../data/input/a.txt",5) file_rdd3 = sc.textFile("../data/input/a.txt",100) print("rdd2的分区数:",file_rdd2.getNumPartitions()) print("rdd3的分区数",file_rdd3.getNumPartitions())
读取HDFS文件:
hdfs_rdd = sc.textFile("hdfs://node1:8020/input/b.txt") print("hdfs_rdd分区数:",hdfs_rdd.getNumPartitions()) print("hdfs_rdd内容:",hdfs_rdd.collect())
wholeTextFile
wholeTextFile()适合读取一堆小文件,这个API适合读取小文件,因此文件的数据很小,分区很多,导致shuffle的几率更高,所以尽量少分区读取数据
API:
- sparkcontext.wholeTextFile(参数1,参数2)
- 参数1:必填,文件夹的目录,支持本地,支持HDFS,也支持一些比如S3协议
- 参数2:可选,表示最小分区数量
- 注意:参数2话语权不足,这API的分区数量最多只能开到文件数量
rdd = sc.wholeTextFiles("../data/input/tiny_files") print(rdd.map(lambda x:x[1]).collect())
返回结果为二元组的形式展示, 前一个值是文件路径, 后一个值为文件内容
2.2–RDD算子
2.2.1–算子概念
算子:分布式集合对象上的API称之为算子
分类:
- Transformation:转换算子
- Action:动作(行动)算子
转换算子:
- 定义:RDD的算子,返回值仍然是一个RDD的,称之为转换算子
- 特性:这类算子是lazy 懒加载的,如果没有action算子,Transformation算子是不工作的
动作算子:
- 定义:返回值不是RDD的就是action算子
2.2.2–Transformation算子
map
map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
API:
- rdd.map(func)
- func : f:(T)=>U
- f:表示这是一个函数(方法)
- (T)=> U 表示方法传入参数为任意了类型,返回值也是任意类型
- (A)=> A 表示传入参数为任意类型,返回值也是任意类型。但是传入值和返回值类型相同
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3) # 定义方法作为算子的传入 def add(data): return data*10 print(rdd.map(add).collect()) # 更简单的方式是定义lamda表达式来写的匿名函数 print(rdd.map(lambda data: data * 10).collect()) # 两种方式都可以,第一种适合复杂函数,第二种适合一行解决的函数 #结果 [10, 20, 30, 40, 50, 60, 70, 80, 90] [10, 20, 30, 40, 50, 60, 70, 80, 90]
flatMap
flatMap对rdd执行map操作,然后进行解除嵌套操作.
API:
- rdd.flatMap(func)
- func : f:(T)=>U
eg:
- 嵌套的list: list[[1,2,3,4,5,6],[7,8],[9]]
- 解除嵌套的list: list[1,2,3,4,5,6,7,8,9]
rdd = sc.parallelize(["hadoop spark hadoop","hadoop flink","spark hadoop"]) rdd2 = rdd.map(lambda line: line.split(" ")) # flatmap无需传参数就可以直接解除嵌套 rdd3 = rdd.flatMap(lambda line:line.split(" ")) #结果 ['hadoop', 'spark', 'hadoop', 'hadoop', 'flink', 'spark', 'hadoop']
reduceByKey
针对(K,V)型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作
API:
- rdd.reduceByKey(func)
- func:(v,v)=>v
- 接受两个传入参数(类型一致),返回一个值,类型和传入的要求一致
rdd = sc.parallelize([('a',1),('b',1),('c',1),('a',1),('a',1)]) print(rdd.reduceByKey(lambda a, b: a + b).collect()) #结果 [('b', 1), ('c', 1), ('a', 3)]
mapValues
分区操作算子 就只对value 进行操作
API:
rdd.mapValue(func)
rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('a',5)]) # 就只对value 进行操作 print(rdd.mapValues(lambda value: value * 10).collect()) #结果
groupBy
将RDD的数据进行分组
API:
- rdd.groupBy(func)
- func(T)=>K
- 函数要求传入一个参数,返回一个返回值,类型无所谓
- 这个函数是拿到你的输入值后,将所有相同输入值的放在一个组里
- 分组完成后,每个组是一个二元组,key就是输入值,所有的数据都放入一个迭代器里作为value
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)]) # groupby 传入的函数的意思是:通过这个函数,确定按照谁来分组(返回谁即可) result_rdd = rdd.groupBy(lambda t:t[0]) print(result_rdd.collect()) # 将value的值强制转换,才能查看迭代器的内容 print(result_rdd.map(lambda t: (t[0], list(t[1]))).collect()) #结果
filter
过滤想要的数据进行保留
API:
- rdd.filter(func)
- func(T)=>bool
- 传入任意类型的参数,返回值是False或者True
rdd = sc.parallelize([1,2,3,4,5]) # 通过filter算子过滤出奇数 result_rdd = rdd.filter(lambda x:x%2 == 1) print(result_rdd.collect()) #结果
distinct
对RDD数据进行去重,返回新的RDD
API:
- rdd.distinct(参数1)
- 参数1,去重分区数量,一般不用传
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 1, 2, 5, 9, 3]) # 去重 print(rdd.distinct().collect()) #结果
union
2个RDD合并成1个RDD返回
API:
- rdd.union(other_rdd)
- 注意只会合并不会去重
- 不同类型的RDD也可以混合
- 合并后的分区数是rdd1和rdd2的分区数和
rdd = sc.parallelize([1, 2, 3, 1, 2, 5, 9, 3]) rdd1 = sc.parallelize(["c", "b", "a"]) # 将两个rdd进行合并 print(rdd.union(rdd1).collect()) #结果
join
分区操作算子 对两个RDD进行Join操作(可实现sql的内\外连接)
API:
-
注意:join算子只能用于二元元组
-
rdd.join(other_rdd) 内连接
-
rdd.leftOuterJoin(other_rdd)左外连接
-
rdd.rightOuterJoin(other_rdd)右外连接
rdd1 = sc.parallelize([(1001,"zhangshan"),(1002,"lisi"),(1003,"wangwu"),(1004,"zhaoliu")]) rdd2 = sc.parallelize([(1001,"销售部"),(1002,"科技部")]) # 通过join来进行rdd间的关联 # 对于join来说,关联条件按照二元组key来进行关联 print(rdd1.join(rdd2).collect()) # 左外连接 print(rdd1.leftOuterJoin(rdd2).collect()) # 右外连接 print(rdd1.rightOuterJoin(rdd2).collect()) #结果
intersection
求2个RDD的交集,返回一个新的rdd
API:
- rdd.intersection(other_rdd)
rdd1 = sc.parallelize([1,2,3,4,'a','b']) rdd2 = sc.parallelize([2,3,4,'b']) # 求rdd间的交集 print(rdd1.intersection(rdd2).collect()) #结果
glom
将RDD的数据加上嵌套,这个嵌套按照分区来进行
API:
- rdd.glom()
eg:
- rdd数据为[1,2,3,4,5]有两个分区,那么glom后,数据就有可能变成[[1,2,3],[4,5]]
rdd = sc.parallelize([1,2,3,4,5,6,7,8],3) # glom可以查看分区排布 print(rdd.glom().collect()) #结果
groupByKey
针对KV型RDD,自动按照key分组
API:
- rdd.groupByKey()
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)]) # 与groupby相同,只是不需要指定 直接就是按照key分组的, # 并且第二个值是相同key的value的集合 rdd1 = rdd.groupByKey() rdd2 = rdd1.map(lambda x:(x[0],list(x[1]))) print(rdd2.collect()) #结果 [('b', [1, 1, 1]), ('a', [1, 1])]
sortBy
对RDD数据进行排序,基于你指定的排序依据
API:
-
rdd.sortBy(func,ascending=False,numPartition=1)
-
func: (T)=>U 告知按照rdd中的哪个数据进行排序 比如:lambda x:x[1] 表示按照rdd中的第二列元素进行排序
-
ascending True升序 False降序
-
numPartition 用多少分区进行排序
rdd = sc.parallelize([('a', 1), ('a', 2), ('b', 1), ('b', 3), ('c', 1)],3) # 使用sortby对数据进行排序 # 按照value进行排序 # 注意 如果要全局有序 排序分区数应设为1 rdd1 = rdd.sortBy(lambda x:x[1],ascending=True,numPartitions=1) print(rdd1.collect()) #结果 [('a', 1), ('b', 1), ('c', 1), ('a', 2), ('b', 3)]
sortByKey
针对KV型RDD,按照key进行排序
API:
- sortByKey(ascending=True,numPartitions=None,keyfunc=
- ascending:升序or降序,True升序, False降序,默认是升序
- numPartitions:按照几个分区进行排序,如果全局有序,设置1
- keyfunc:在排序前对key进行处理,语法是: (k)→ U ,一个参数传入,返回一个值
rdd = sc.parallelize([('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)],3) # keyfunc = lambda key: str(key).lower() 忽略大小写的影响 rdd1 = rdd.sortByKey(ascending=True,numPartitions=3,keyfunc=lambda key:str(key).lower()) print(rdd1.collect()) #结果 [('A', 1), ('a', 2), ('b', 1), ('b', 3), ('C', 1)]
keys
概述: rdd必须是键值对类型的数据, 获取键值对所有的键(key).
values
概述: rdd必须是键值对类型的数据, 获取键值对所有的值(value).
2.2.3–Action算子
countByKey
统计key出现的次数(一般适用于KV型的RDD)
rdd = sc.textFile("../data/input/a.txt") rdd2 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1)) # 通过countbykey来对key进行计数 这是一个Action算子 不能colloct result = rdd2.countByKey() print(result) print(type(result)) #结果 defaultdict(, {'hadoop': 2, 'hello': 2, 'zxz': 2, 'wrx': 2})
collect
将RDD各个分区内的数据,统一收集到Driver中,形成List对象
API:
- rdd.collect()
- 这个算子,是将RDD各个分区数据都拉取到Driver
- 注意的是, RDD是分布式对象,其数据量可以很大,所以用这个算子之前要心知肚明的了解结果数据集不会太大。不然,会把Driver内存撑爆
reduce
对RDD数据集按照传入的逻辑进行聚合
API:
- rdd.reduce(func)
- func:(T,T)=>T
- 两个参数传入,一个返回值,返回值和参数要求类型一致
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9]) # reduce是action算子 result = rdd.reduce(lambda a,b:a+b) print(result) #结果 45
fold
和reduce一样,接受传入逻辑进行聚合,聚合是带有初始值的,这个初始值聚合会作用在分区内和分区间
API:
- rdd,fold(初始值,func)
eg:
- 比如:[[1,2,3],[4,5,6],[7,8,9]]数据分布在3个分区
- 分区1 123聚合的时候带上10作为初始值得到16
- 分区2 456聚合的时候带上10作为初始值得到25
- 分区3 789聚合的时候带上10作为初始值得到34
- 3个分区的结果做聚合也带上初始值10,所以结果是: 10+16+ 25+34=85
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3) # print(rdd.glom().collect()) result = rdd.fold(10,lambda a,b:a+b) print(result) #结果 85
aggregate
和fold算子相同, 唯一不同的是可以分别定义分区内计算函数和分区外计算函数
rdd1 = sc.parallelize(range(5), 2) print(rdd1.glom().collect()) # aggregate算子演示 seqOp = (lambda x,y:x+y) comOp = (lambda x,y:x+y+1) print(rdd1.aggregate(2, seqOp, comOp))
-
下图所示的是三个算子执行时的计算过程, 这里使用的是累加的过程
-
fold和aggregate计算过程是一样的, 但是唯一区别是aggregate可以指定分区内外的计算方式不同.
-
设置初始值的算子,初始值被加分区数n+1次
first
取出RDD第一个元素
rdd = sc.parallelize([1,4,3,2,7,6,2,1,9]) print(rdd.first()) #结果 1
take
取RDD的前N个元素,组合成List返回
注意: take是将rdd中前n个元素拉取到Driver所在主机的内存中, 由于driver的内存是有限的, 数据过多会造成driver的内存溢出.
print(rdd.take(4)) #结果 [1, 4, 3, 2]
top
对RDD数据集进行降序排序,取前N个
print(rdd.top(3)) #结果 [9, 7, 6]
count
计算RDD有多少个数据,返回值是个数字
print(rdd.count()) #结果 9
takeSample
随机抽样RDD的数据
API:
- takeSample(参数1:True or False,参数2:采样数,参数3:随机数种子)
- 参数1:True表示运行取同一个数据,False表示不允许取同一个数据. 和数据内容无关,是否重复表示的是同一个位置的数据
- 参数2:抽样要几个
- 参数3∶随机数种子,这个参数传入—个数字即可,随意给
rdd = sc.parallelize([1,2,1,4,1,6,7,1,2]) print(rdd.takeSample(False, 5,1)) #结果 [6, 7, 1, 1, 4]
takeOrdered
对RDD进行排序取前N个
API:
- rdd.takeordered(参数1,参数2)
- 参数1要几个数据
- 参数2对排序的数据进行更改(不会更改数据本身,只是在排序的时候换个样子)
- 这个方法使用按照元素自然顺序升序排序,如果你想玩倒叙,需要用参数2来对排序的数据进行处理
rdd = sc.parallelize([1, 2, 9, 2, 1, 5, 7, 3, 9]) # 默认是升序并选取前n个元素 print(rdd.takeOrdered(3)) # 增加匿名函数可以使其降序排序并选取前n个 == top print(rdd.takeOrdered(3, lambda x: -x)) #结果 [1, 1, 2] [9, 9, 7]
foreach
对RDD的每一个元素,执行提供的逻辑操作(和map一样),但这个方法没有返回值
注意: 但是它执行的时候, 每个分区会抢占cpu资源,从而会导致数据乱序的情况
API:
- rdd.foreach(func)
- func(T)=>None
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9]) result = rdd.foreach(lambda x: x * 10) # foreach对元素进行处理后没有返回值 print(result) # 这种方式的输出是由executor输出的而不是drive result = rdd.foreach(lambda x: print(x * 10)) #结果 None 50 60 70 80 90 10 20 30 40
saveAsTextFile
将RDD的数据写入文本文件,支持本地写出,hdfs等文件系统
- 注意:写出的时候,每个分区所在的Task直接控制数据写出到目标文件系统中,所以才会一个分区产生1个结果文件.
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3) # 将内容写到本地 # rdd.saveAsTextFile("../data/output/out2") # 将内容保存到hdfs rdd.saveAsTextFile("hdfs://node1:8020/input/output/out1") # saveastextfile中是由executor来写入的
- action算子中foreach和saveAsTextFile算子是分区直接执行的,其余的Action算子都回将结果发送至Driver
collectAsMap
概述: 将二元组的RDD转化为字典(dict). 将它转为rdd,就相当于将其转为本地,不再是弹性分布式数据集rdd了.
2.2.4–分区操作算子
mapPartition
Transformation算子 mapPartition—次被传递的是一整个分区的数据作为一个迭代器(一次性list)对象传入过来
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3) def process(iter): result = list() for it in iter: result.append(it*10) return result # mapParttion 是将一个分区的值按照list或者迭代器送到计算的地方 # 相比map 可以提高网络传送的性能 print(rdd.mapPartitions(process).collect()) #结果 [10, 20, 30, 40, 50, 60, 70, 80, 90]
foreachPartition
Action算子 和普通的foreach一致,但是一次处理的是一整个分区数据
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9],3) def process(iter): result = list() for it in iter: result.append(it*10) return print(result) # 与mapPartition相同,但是不返回值 rdd.foreachPartition(process) #结果 [10, 20, 30] [40, 50, 60] [70, 80, 90]
- foreachPartition就是一个没有返回值的mapPartitions
partitionBy
Transformation算子 对RDD进行自定义分区
API:
- rdd.partitionBy(参数1,参数2)
- 参数1重新分区后有几个分区
- 参数2自定义分区规则,函数传入
- 参数2:(K)→ int 一个传入参数进来,类型无所谓,但是返回值一定是int类型将key传给这个函数,你自己写逻辑,决定返回一个分区编号
- 分区编号从0开始,不要超出分区数-1
rdd = sc.parallelize([('a',1),('b',2),('a',3),('c',2),('d',5)]) # 使用partitionby实现自定义分区 def process(key): if 'a' == key or 'b' ==key: return 0 if 'c' == key: return 1 return 2 print(rdd.partitionBy(3, process).glom().collect()) #结果
repartition
Transformation算子 对RDD的分区执行重新分区(仅数量)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3) # repartition修改分区 # 使用该API时,尽量降分区数,因为用该api会影响内存迭代管道 print(rdd.repartition(1).getNumPartitions()) print(rdd.repartition(5).getNumPartitions())
coalesce
Transformation算子 对分区数量增减
API:
- rdd.coalesce(参数1,参数2)
- 参数1,分区数
- 参数2,True or False
- True表示允许shuffle,也就是可以加分区
- False表示不允许shuffle,也就是不能加分区,False是默认
# coalesce 修改分区 只有将shuffle改为true才能按照所给的数分区, # 要不然会自动进行安全鉴定而进行分区 print(rdd.coalesce(1).getNumPartitions()) print(rdd.coalesce(5,shuffle=True).getNumPartitions())
2.3–RDD缓存
2.3.1–RDD缓存的目的
- RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。
- RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。
- 这个特性可以最大化的利用资源,老l日RDD没用了就从内存中清理,给后续的计算腾出内存空间。
- RDD的缓存技术: Spark提供了缓存APIl,可以让我们通过调用API,将指定的RDD数据保留在内存或者硬盘上缓存的API
2.3.1–RDD缓存的特点
-
缓存技术可以将过程RDD数据持久化保存到内存或者硬盘上,但是,这个保存在设定上是认为不安全的。
-
缓存的数据在设计上是认为有丢失风险的,所以缓存有一个特点就是:其保留RDD之间的血缘(依赖)关系
一旦缓存丢失,可以基于血缘关系的记录,重新计算这个RDD的数据
-
缓存如何丢失:在内存中的缓存是不安全的,比如断电\计算任务内存不足,把缓存清理给计算让路
硬盘中因为硬盘损坏也是可能丢失的。
-
RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上,这是分散存储
2.3.2–RDD缓存的API
from pyspark.storagelevel import StorageLevel
rdd3.cache() 缓存到内存中
rdd3. persist(StorageLevel.MEMORY_ONLY) 仅内存缓存
rdd3. persist(StorageLevel.MEMORY_ONLY_2) 仅内存缓存,2个副本
rdd3. persist(StorageLevel.DISK_ONLY) 仅缓存硬盘上
rdd3.persist(StorageLevel.DISK_ONLY_2) 仅缓存硬盘上,2个副本
rdd3.persist(storageLevel.DISK_ONLY_3) 仅缓存硬盘上,3个副本
rdd3.persist(StorageLevel.MEMORY_AND_DISK) 先放内存,不够放硬盘
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)先放内存,不够放硬盘,2个副本
rdd3.persist(StorageLevel.OFF_HEAP) 堆外内存(系统内存)
如上API,自行选择使用即可
一般建议使用rdd3.persist(StorageLevel.MEMORY_AND_DISK)
如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了用CheckPoint
主动清理缓存的API rdd . unpersist()
from pyspark import SparkContext, SparkConf from pyspark.storagelevel import StorageLevel if __name__ == '__main__': conf = SparkConf().setAppName("Text").setMaster("local[*]") sc = SparkContext(conf=conf) rdd1 = sc.textFile("../data/input/a.txt") rdd2 = rdd1.flatMap(lambda x:x.split(" ")) rdd3 = rdd2.map(lambda x:(x,1)) # 增加缓存 rdd3.cache() # 自定义缓存 rdd3.persist(StorageLevel.MEMORY_AND_DISK_2) rdd4 = rdd3.reduceByKey(lambda a,b:a+b) print(rdd4.collect()) rdd5 = rdd3.groupByKey() rdd6 = rdd5.mapValues(lambda x:sum(x)) print(rdd6.collect()) # 解除缓存 rdd3.unpersist()
2.3.3–RDD的CheckPoint
CheckPoint技术,也是将RDD的数据,保存起来,但是它仅支持硬盘存储,并且,它被设计认为是安全的不保留血缘关系
特点:
- CheckPoint存储RDD数据,是集中收集各个分区数据进行存储,而缓存是分散存储
- CheckPoint不管分区数量多少,风险是一样的,缓存分区越多,风险越高
- CheckPoint支持写入HDFS,缓存不行, HDFS是高可靠存储, CheckPoint被认为是安全的.
- CheckPoint不支持内存,缓存可以,缓存如果写内存性能比CheckPoint要好一些
- CheckPoint因为设计认为是安全的,所以不保留血缘关系,而缓存因为设计上认为不安全,所以保留
使用:
CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用CheckPoint比较合适,或者数据量很大,用CheckPoint比较合适.
如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可.
Cache和CheckPoint两个API都不是Action类型,所以想要它俩工作,必须在后面接上Action,接上Action的目的,是让RDD有数据,而不是为了让checkPoint和Cache工作。
from pyspark import SparkContext, SparkConf from pyspark.storagelevel import StorageLevel if __name__ == '__main__': conf = SparkConf().setAppName("Text").setMaster("local[*]") sc = SparkContext(conf=conf) # 1、告知spark开启checkpoint功能 sc.setCheckpointDir("hdfs://node1:8020/input/output/ckp") rdd1 = sc.textFile("../data/input/a.txt") rdd2 = rdd1.flatMap(lambda x:x.split(" ")) rdd3 = rdd2.map(lambda x:(x,1)) # 2、调用checkpoint的api 保存数据即可 rdd3.checkpoint() rdd4 = rdd3.reduceByKey(lambda a, b: a + b) print(rdd4.collect()) rdd5 = rdd3.groupByKey() rdd6 = rdd5.mapValues(lambda x: sum(x)) print(rdd6.collect()) # 解除缓存 rdd3.unpersist()
2.4–广播变量
2.4.1–概念
本地list对象,被发送到每个分区的处理线程上使用,也就是一个executor内,其实存放了2份一样的数据。
executor是进程,进程内资源共享,这2份数据没有必要,造成了内存浪费。
如果将本地list对象标记为广播变量对象,那么当上述场景出现的时候,Spark只会,给每个Executor来一份数据,而不是像原本那样,每一个分区的处理线程都来一份,节省内存.
2.4.2–API
1.将本地list标记成广播变量是即可
broadcast = sc. broadcast(stu_info_list)
2.使用广播变量,从broadcast对象中取出本地list对象即可value = broadcast.value
- 也就是先放进去broadcast内部,然后从broadcast内部在取出来用,中间传输的是broadcast这个对象了
- 只要中间传翰的是broadcast对象,spark就会留意,只会给每个Executor发一份了,而不是傻傻的哪个分区要都给.
from pyspark import SparkContext,SparkConf if __name__ == '__main__': conf = SparkConf().setAppName("text").setMaster("local[*]") sc = SparkContext(conf=conf) stu_info_list = { (1,'zxz',11), (2,'wrx',13), (3,'小明',11), (4,'王大力',11) } # 1.将本地python list对象标记为广播变量 broadcast = sc.broadcast(stu_info_list) score_info_rdd = sc.parallelize([ (1,'语文',99), (2,'数学',99), (3,'英语',99), (4,'数学',99), (2,'编程',99), (4,'语文',99), (1,'数学',99) ]) def map_func(data): id = data[0] name = '' # 匹配分布list和rdd分布式的id # 2.在使用到本地集合对象的地方,从广播变量中取出来用即可 for stu_info in broadcast.value: stu_id = stu_info[0] if id == stu_id: name = stu_info[1] return (name,data[1],data[2]) print(score_info_rdd.map(map_func).collect()) """ 场景:本地集合对象和分布式对象(RDD)进行关联的时候 需要将本地集合封装为广播变量 可以节省: 1.网络io的次数 2.Executor的内存占用 """ #结果
2.5–累加器
2.5.1–需求
想要对map算子计算中的数据,进行计数累加,得到全部数据计算完后的累加结果
2.5.2–没有累加器的代码演示
from pyspark import SparkContext,SparkConf if __name__ == '__main__': conf = SparkConf().setAppName("text").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2) count = 0 def map_func(data): global count count += 1 print(count) rdd.map(map_func).collect() print(count) # 可以看出count最外面的值没有变化 #结果 0
2.5.3–增加累加器的代码
原因:
count来自driver对象,当在分布式的map算子中需要count对象的时候,driver会将count对象发送给每一个executor一份(复制发送),每个executor各自收到一个,在最后执行print(count)的时候,这个被打印的count依旧是driver的那个,所以不管executor中累加到多少,都和driver这个count无关。
API:
- sc.accumulator(初始值)
- 这个对象唯一和前面提到的count不同的是
- 这个对象可以从各个Executor中收集它们的执行结果,作用回自己身上
from pyspark import SparkContext,SparkConf if __name__ == '__main__': conf = SparkConf().setAppName("text").setMaster("local[*]") sc = SparkContext(conf=conf) rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10],2) # spark提供的累加器变量,参数是初始值 scmlt = sc.accumulator(0) def map_func(data): global scmlt scmlt += 1 # print(scmlt) rdd2 = rdd.map(map_func) rdd2.collect() rdd3 = rdd2.map(lambda x:x) rdd3.collect() print(scmlt) #结果 20
如上代码,第一次rdd2被action后,累加器值是10,然后rdd2就没有了(没数据了),当rdd3构建出来的时候,是依赖rdd2的, rdd2没数据,那么rdd2就要重新生成.重新生成就导致累加器累加数据的代码再次被执行,所以代码的结果是20。
注意事项:
也就是,使用累加器的时候,要注意,因为rdd是过程数据,如果rdd被多次使用可能会重新构建此rdd如果累加器累加代码,存在重新构建的步骤中,累加器累加代码就可能被多次执行。
如何解决:加缓存或者checkPoint即可.
2.6–python程序对hdfs文件操作
from hdfs.client import Client client = Client('http://node1:9870') # 创建文件夹 client.makedirs('/datas/aaa') # 删除文件夹 client.delete('/datas/output') # 上传文件 client.upload('/datas/aaa/a.txt','E:/aa/a.txt') # 下载文件 client.download('/datas/a.txt','E:/a.txt')
2–SparkSQL
3.1–DataFrame
3.1.1–SparkSession对象环境的创建
- SparkSession对象
# Sparksession对象的导包 from pyspark.sql import SparkSession if __name__ == '__main__': # 构建session环境入口 spark = SparkSession.builder.appName("test").master('local[*]') \ .getOrCreate()
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
# 通过SparkSession对象获取SparkContext对象 sc = spark.sparkContext
- 用于SparkSQL编程作为入口对象
# SparkSql的helloworld df = spark.read.csv("../data/input/stu_score.txt", sep=',', header=False) df2 = df.toDF("id", "name", "score") df2.printSchema() df2.show() df2.createTempView("score") # SQL风格 spark.sql(""" SELECT * FROM score WHERE name='语文' LIMIT 5 """).show()
- 所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象
3.1.2–DataFrame的组成
DataFrame是一个二维表结构, 那么表格结构就有无法绕开的三个点:
- 行
列
- 表结构描述
比如,在MySQL中的一张表:
- 由许多行组成
- 数据也被分成多个列
- 表也有表结构信息(列、列名、列类型、列约束等)
基于这个前提,DataFrame的组成如下:
在结构层面:
- StructType对象描述整个DataFrame的表结构
- StructField对象描述一个列的信息
在数据层面:
- Row对象记录一行数据
- Column对象记录一列数据并包含列的信息
一个StructField记录:列名、列类型、列是否运行为空
多个StructField组成一个StructType对象。
一个StructType对象可以描述一个DataFrame:有几个列、每个列的名字和类型、每个列是否为空
同时,一行数据描述为Row对象,如Row(1, 张三, 11)
一列数据描述为Column对象,Column对象包含一列数据和列的信息
3.1.3–DataFrame代码构建
基于RDD方式1
DataFrame对象可以从RDD转换而来,都是分布式数据集,其实就是转换一下内部存储的结构,转换为二维表结构
API:
- spark.createDataFrame(data_rdd,schema=[‘name’,‘age’])
- 参数1 被转换的rdd
- 参数2 通过list的形式按照顺序依次提供字符串名称即可
from pyspark.sql import SparkSession if __name__ == '__main__': # 构建执行环境入口对象SparkSession spark = SparkSession.builder.appName('test').master('local[*]').\ getOrCreate() sc = spark.sparkContext # 基于rdd转换成Dataframe类型 file_rdd = sc.textFile('../data/input/sql/people.txt') data_rdd = file_rdd.map(lambda x:x.split(', ')).map(lambda x:(x[0],int(x[1]))) # 构建Dataframe对象 # 参数1 被转换的rdd # 参数2 通过list的形式按照顺序依次提供字符串名称即可 df = spark.createDataFrame(data_rdd,schema=['name','age']) # 打印DataFrame的表结构 df.printSchema() # 打印df的数据 # 参数1:展示多少条数据,默认不传的话是20 # 参数2:表示是否对列进截断,如果列的数据超过20个字符串长度, # 后续的内容不显示,以。。。代替,如果给False,表示不截断全部显示,默认是True df.show(20,False) # 将df对象转换成临时视图表,可供sql语句查询 df.createOrReplaceTempView("people") spark.sql(""" SELECT * FROM people WHERE age
- 行
- 所以,我们后续的代码,执行环境入口对象,统一变更为SparkSession对象
- 用于SparkSQL编程作为入口对象
- 用于SparkCore编程,可以通过SparkSession对象中获取到SparkContext
- SparkSession对象
-
- foreachPartition就是一个没有返回值的mapPartitions
- action算子中foreach和saveAsTextFile算子是分区直接执行的,其余的Action算子都回将结果发送至Driver
- 注意:写出的时候,每个分区所在的Task直接控制数据写出到目标文件系统中,所以才会一个分区产生1个结果文件.
-
- rdd,fold(初始值,func)
-
- rdd.groupByKey()
- rdd数据为[1,2,3,4,5]有两个分区,那么glom后,数据就有可能变成[[1,2,3],[4,5]]
- rdd.glom()
- rdd.intersection(other_rdd)
-
- 定义:返回值不是RDD的就是action算子
-
- rdd.getNumPartitions()