Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
    • 一、转换算子
      • coalesce函数
      • repartition函数
      • flatMap——flatMap变换
      • sample——抽样
      • zip——联结
      • mapValues——对Value值进行变换
    • 二、行动Action算子
      • 数据运算类行动算子
      • reduce——Reduce操作
      • collect——收集元素
      • countByKey——按Key值统计Key/Value型RDD中的元素个数
      • countByValue——统计RDD中元素值出现的次数
      • foreach——逐个处理RDD元素
      • lookup——查找元素
      • take——获取前n个元素
      • takeSample——提取n个元素
      • takeOrdered——获取排序后的前n个元素
      • 存储型行动算子
      • saveAsObjectFile——存储为二进制文件
      • saveAsTextFile——存储为文本文件
      • saveAsNewAPIHadoopFile——存储为Hadoop文件
    • 三、缓存算子
      • cache——缓存RDD
      • checkpoint——建立RDD的检查点
      • persist——持久化RDD
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2022-07-06
目录

Spark算子

# Spark算子

[toc]

# 一、转换算子

# coalesce函数

返回一个经过简化到numPartitions个分区的新RDD。这会导致一个窄依赖,例如:你将1000个分区转换成100个分区,这个过程不会发生shuffle,相反如果10个分区转换成100个分区将会发生shuffle。然而如果你想大幅度合并分区,例如合并成一个分区,这会导致你的计算在少数几个集群节点上计算(言外之意:并行度不够)。为了避免这种情况,你可以将第二个shuffle参数传递一个true,这样会在重新分区过程中多一步shuffle,这意味着上游的分区可以并行运行。

注意:第二个参数shuffle=true,将会产生多于之前的分区数目,例如你有一个个数较少的分区,假如是100,调用coalesce(1000, shuffle = true)将会使用一个 HashPartitioner产生1000个分区分布在集群节点上。这个(对于提高并行度)是非常有用的。如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDD的partition数变多的

# repartition函数

返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle

Repartition函数内部调用了coalesce函数 shuffle 为True

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {

  coalesce(numPartitions, shuffle = true)

}
1
2
3
4
5

# flatMap——flatMap变换

算子函数格式:

flatMap[U](f:FlatMapFunction[T,U]):JavaRDD[U]

在前面我们已经了解到map变换是对原RDD中的每个元素进行一对一变换生成

新RDD,而flatMap不同的地方在于,它是对原RDD中的每个元素用指定函数f进行一

对多(这也是lat前缀的由来)的变换,然后将转换后的结果汇聚生成新RDD.

示例:

flatMap示例代码

scala>valrdd=sc,parallelize(0 to 3,1)//生成由0-3序列构成的RDD

    rdd:org.apache,spark.rdd.RDD[Int]=ParallelCollectionRDD[17] at parallelize at:21

scala>val flatMappedRDD=rdd.flatMap(x=>0tox)//使用flatMap将每个原始变换为一个序列

   flatMappedRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[18] at flatMap at:23

scala>flatMappedRDD.collect//显示新的RDD

   res0:Array[Int]=Array(0,0,1,0,1,2,0,1,2,3)
1
2
3
4
5
6
7
8
9
10
11

# sample——抽样

算子函数格式:

sample(withReplacement:Boolean,fraction:Double,seed:Long):JavaRDD[T]

对原始RDD中的元素进行随机抽样,抽样后产生的元素集合构成新的RDD.

参数fraction 指定新集合中元素的数量占原始集合的比例.抽样时的随机数种子由seed指定.

参数withReplacement为false时,抽样方式为不放回抽样.

参数withReplacement为true时,抽样方式为放回抽样.

示例:

sample示例代码

1:scala>valrdd=sc.parallelize(0to9,1)//生成由0-9的序列构成的RDD

rdd:org.apache.spark.rdd.RDD [Int1=ParallelCollectionRDD[5]at parallelize at:21

2:scala>rdd.sample(false,0.5).collect//不放回抽样一半比例的元素生成新的RDD

res4:Array[Int]=Array(0,1,2,3,4,7)

3:rdd.sample(false,0.5).collect//再次不放回抽样一半比例的元素生成新的RDD

res7:Array [Int]=Array(0,1,3,6,8)

4:scala>rdd.sample(false,0.8).collect//不放回抽样80%比例的元素生成新的RDD

res8:Array[Int]=Array(0,1,2,5,6,8,9)

5:scala>rdd.sample(true,0.5).co1lect//放回抽样一半比例的元素生成新的RDD

res9:Array[Int]=Array(0,2,3,4,4,6,7,9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# zip——联结

算子函数格式:

zip[U](other:JavaRDDLike[U,_]):JavapairRDD(T,U]

输入参数为另一个RDD,zip变换生成由原始RDD的值为Key、输入参数RDD的值为Value依次配对构成的所有Key/Value对,并返回这些Key/Value对集合构成的新RDD.

示例:

zip示例代码

1:scala>val rdd1=sc.parallelize(0 to 4,1)//构建原始RDD

rdd_1:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[19]at parallelize at :21

2:scala>val rdd2=sc.parallelize(5 to 9,1)//构建输入参数RDD

rdd_2:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD [20]at parallelize at :21

3:scala>rdd_1.zip(rdd_2).collect//对两个RDD进行联结

res5:Array[(Int,Int)]=Array((0,5),(1,6),(2,7),(3,8),(4,9})
1
2
3
4
5
6
7
8
9
10
11

# mapValues——对Value值进行变换

算子函数格式:

mapValues[u](f:Function[v,U]):JavapairRDD[K,U]

将Key/Value型RDD中的每个元素的Value值,使用输入参数函数f进行变换,生成新的RDD.

示例:

1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumquat","haw"),1).keyBy(_.1ength)//构建原始RDD

pairs:org.apache.spark.rdd.RDDI(Int,String)]=MappedRDD[16]at keyBy at:12

2:scala>pairs.mapvalues(v=>v+""+V{0)).collect//生成将单词加单词首字母的RDD

res0:Array[(Int,string)]=Array{(5,apple a),(6,banana b),(5,berry b),(6,cherry c),7,cumquat c),(3,haw h))
1
2
3
4
5
6
7

# 二、行动Action算子

# 数据运算类行动算子

# reduce——Reduce操作

算子函数格式:

reduce(f:Function2[T,T,T]):T

对RDD中的每个元素依次使用指定的函数f进行运算,并输出最终的计算结果.

需要注意的是,Spark中的reduce操作与Hadoop中的reduce操作并不一样.在Hadoop中,reduce操作是将指定的函数作用在Key值相同的全部元素上.而Spark的reduce操作则是对所有元素依次进行相同的函数计算.

示例:

1:scala>val nums=sc.parallelize(0 to 9,5)//构建由数字0-9构成的RDD

nums:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[18]at parallelize at:12

2:scala>nums.reduce(_+_)//计算RDD中所有数字的和
1
2
3
4
5

# collect——收集元素

算子函数格式:

co1lect():List[T]

collect的作用是以数组格式返回RDD内的所有元素.

示例:

1:scala>val data=sc.parallelize(List(1,2,3,4,5,6,7,8,9,0},2)//构建原始RDD

data;org.apache.spark.rdd.RDD[Int]=ParallelCo1lectionRDD[8]at parallelize at:12

2:scala>data.collect//显示原始RDD中的元素

res0:Array[Int]=Array(l,2,3,4,5,6,7,8,9,0)
1
2
3
4
5
6
7

# countByKey——按Key值统计Key/Value型RDD中的元素个数

算子函数格式:

countByKey():Map[K,Long]

计算Key/Value型RDD中每个Key值对应的元素个数,并以Map数据类型返回

统计结果.

示例:

1:scala>val pairRDD=sc.parallelize(List(("fruit","Apple"),("fruit","Banana"),{"fruit","Cherry "),{"vegetable","bean"),("vegetable","cucumber"),("vegetable","pepper")),2} //构建原始 RDD

pairRDD:org.apache.spark.rdd.RDD [(String,String)]=Paralle1 Collection RDD[3 Jat parallelize at :12

2:sca1a>pairRDD.countByKey //统计原始RDD中每个物品类型下的物品数量

res0:scala.collection.Map[String,Long]=Map(fruit->3,vegetable->3)
1
2
3
4
5
6
7

# countByValue——统计RDD中元素值出现的次数

算子函数格式:

countByValue():Map[T,Long]

计算RDD中每个元素的值出现的次数,并以Map数据类型返回统计结果.

countByValue示例代码

1:scala>val num=sc.parallelize(List(1,1,1,2,2,3),2)//构建原始RDD

num:org.apache.spark.rdd.RDD [Int]=ParallelcollectionRDD[4]at

parallelize at:12

2:scala>num.countByValue//统计原始RDD中每个数字出现的次数

res0:scala.collection.Map[Int,Long]=Map(2->2,1->3,3->1)
1
2
3
4
5
6
7
8
9

# foreach——逐个处理RDD元素

算子函数格式:

foreach(f:VoidFunction[(K,V)]):Unit

对RDD中的每个元素,使用参数f指定的函数进行处理.

示例:

1:scala>val words=sc.parallelize(List("A","B","C","D"),2)//构建原始 RDD

words;org.apache.spark.rdd,RDD[String]=ParallelCollectionRDD[9] at parallelize at :21

2:scala>words.foreach(x=>print1n(x+"is a letter."))/打印输出每个单词构造的一句话

Cis a letter.

Ais a letter.

Dis a letter.

Bis a letter.
1
2
3
4
5
6
7
8
9
10
11
12
13

# lookup——查找元素

算子函数格式:

lookup(key:K):List[V]

在Key/Value型的RDD中,查找与参数key相同Key值的元素,并得到这些元素

的Value值构成的序列.

示例:

1:scala>val pairs=sc.parallelize(List("apple","banana","berry","cherry","cumcquat","haw"),1).keyBy (_.1ength)//构建原始RDD

pairs:org.apache.spark.rdd.RDDt(Int,String)]=MapPartitionsRDD[13] at keyBy at:21

2:scala>pairs.collect

res18:Array [(Int,String)]=Array((5,apple),(6,banana),(5,berry),(6,cherry),(7,cumcuat),(3,haw))

3:scala>pairs.lookup(5)//查找长度为5的单词

res19:Seq[string]=WrappedArray (apple,berry)
1
2
3
4
5
6
7
8
9
10
11

# take——获取前n个元素

# takeSample——提取n个元素

# takeOrdered——获取排序后的前n个元素

# 存储型行动算子

# saveAsObjectFile——存储为二进制文件

算子函数格式:

saveAsobjectPile(path:string):Unit

将RDD转换为序列号对象后,以Hadoop SequenceFile文件格式保存,保存路径由

参数path指定.

示例:

1:scala>val data=sc.parallelize(0to9,1)//构建0-9组成的RDD

data:org.apache.spark.rdd.RDD[Int]=Paralle1CollectionRDD[40]at parallelize at :12

2:scala>data.saveAsobjectFile("obj")//将RDD以SequenceFile文件格式保存,文件名为obj
1
2
3
4
5

# saveAsTextFile——存储为文本文件

# saveAsNewAPIHadoopFile——存储为Hadoop文件

# 三、缓存算子

为了提高计算效率,Spark采用了两个重要机制:

①基于分布式内存数据集进行运算,也就是我们已经熟知的RDD;

②变换算子的惰性执行(Lazy Evaluation),即RDD的变换操作并不是在运行到该行代码时立即执行,而仅记录下转换操作的操作对象.只有当运行到一个行动算子代码时,变换操作的计算逻辑才真正执行.

这两个机制帮助Spark提高了运算效率,但正如'硬币都有两面'一样,在带来提升性能的好处的同时,这两个机制也留下了隐患.

例如:

①如果在计算过程中,需要反复使用某个RDD,而该RDD需要经过多次变换才能得到,则每次使用该RDD时都需要重复这些变换操作,这种运算效率是很低的;

②在计算过程中数据存放在内存中,如果出现参与计算的某个节点出现问题,则存放在该节点内存中的RDD数据会发生损坏.如果损坏的也是需要经过多次变换才能得到的RDD,此时虽然可以通过再次执行计算恢复该RDD,但仍然要付出很大的代价.因此,Spark提供了一类缓存算子,以帮助用户解决此类问题.

# cache——缓存RDD

算子函数格式:

cache():JavaRDD[T]

cache将RDD的数据持久化存储在内存中,其实现方法是使用后面我们会介绍的persist算子.当需要反复使用某RDD时,使用cache缓存后,可以直接从内存中读出,不再需要执行该RDD的变换过程.需要注意的是,这种缓存方式虽然可以提高再次使用某个RDD的效率,但由于cache后的数据仅仅存储在内存中,因此不能解决RDD出错时需要再次恢复运算的问题.而且cache保存的数据在Driver关闭后会被清除,因此不能被在其他Driver中启动的Spark程序使用.

示例:

1:scala>val num=sc.parallelize(0to9,1)//构建RDD

num:org,apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[7]at parallelize at:21

2:scala>val result=num.map(x=>x*x)//对原始RDD进行map变换

result:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD [8]at map at:23

3:scala>result.cache//对新RDD进行缓存

res19:result.type=MapPartitionsRDD[8 Jat map at :23

4:scala>result.count//统计新RDD中的元素个数

res30:Long=10

5:scala>result.collect().mkstring(',")//再次使用新RDD,生成用逗号分隔的序列

res31:String=0,1,4,9,16,25,36,49,64,81
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# checkpoint——建立RDD的检查点

算子函数格式:

checkpoint():Unit

对于需要很长时间才能计算出或者需要依赖很多其他RDD变化才能得到的RDD,如果在计算过程中出错,要从头恢复需要付出很大的代价.此时,可以利用checkpoint建立中间过程的检查点,Spark会将执行checkpoint操作的RDD持久化,以二进制文件的形式存放在指定的目录下.与cache不同的是,checkpoint保存的数据在Driver关闭后仍然以文件的形式存在,因此可以被其他Driver中的Spark程序使用.

示例:

1:scala>val rdd=sc.makeRDD(1to9,2)//构建原始RDD

rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at makeRDD at :21

2:scala>val flatMapRDD=rdd.flatMap(x=>Seq(x,x))//对原始RDD做

flatMap变换

flatMapRDD:org.apache.spark.rdd.RDD [Int]=MappartitionsRDD[1]at flatMap at:23

3:scala>sc.setCheckpointDir("my_checkpoint")//指定checkpoint存放的目录

4:scala>flatMapRDD.checkpoint()//建立 checkpoint

5:scala>flatMapRDD.dependencies.head.rdd//显示变换后RDD的依赖

res2:org.apache.spark.rdd.RDD(_]=ParallelcollectionRDD[0]at makeRDD at:21

6:scala>flatMapRDD.collect()//显示变换后的RDD

res3:Array[Int]=Array(1,1,2,2,3,3,4,4,5,5,6,6,7,7,8,8,9,9)

7:scala>flatMapRDD.dependencies.head.rdd//再次显示变换后RDD的依赖

res4:org.apache.spark.rdd.RDD [_1=CheckpointRDD[2]at collect at:26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# persist——持久化RDD

算子函数格式:

persist(newLeve1:storageLeve1):JavaRDD[T]

调用persist可对RDD进行持久化操作,利用参数newlevel可以指定不同的持久化方式,常用的持久化方式包括:

  • MEMORY_ONLY:仅在内存中持久化,且将RDD作为非序列化的Java对象存储在JVM中.这种方式比较轻量,是默认的持久化方式.

  • MEMORY_ONLY_SER:仅在内存中持久化,且将RDD作为序列化的Java对象存储(每个分区一个byte数组).这种方式比MEMORY_ONLY方式要更加节省空间,但会耗费更多的CPU资源进行序列化操作.

  • MEMORY_ONLY_2:仅在内存中持久化,且将数据复制到集群的两个节点中.

  • MEMORY_AND_DISK:同时在内存和磁盘中持久化,且将RDD作为非序列化的Java对象存储.

  • MEMORY_AND_DISK_SER:同时在内存和磁盘中持久化,且将RDD作为序列化的Java对象存储.

  • MEMORY_AND_DISK_2:同时在内存和磁盘中持久化,且将数据复制到集群的两个节点中.

persist示例代码

1:scala>val num=sc.parallelize(0to9,1)//构建RDD

num:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]at parallelize at :12

2:scala>num.getStorageLeve1//显示RDD当前的持久化状态

res8:org.apache.spark.storage.Storagelevel=StorageLevel{false,false,false,false,1)

3:scala>num.persist()//使用persist进行默认的MEMORY_ONLY持久化

res9:num.type=ParallelCollectionRDD [5] at parallelize at:21

4:scala>num.getStorageLeve1//显示RDD新的持久化状态

res10:org,apache,spark,storage.StorageLevel=StorageLevel(false,true,false,true,1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
上次更新: 2023/03/10, 16:49:38
Spark启动参数以及调优记录

Spark启动参数以及调优记录→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式