Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2022-07-07

Spark foreachRDD的正确使用

常出现的使用误区:

**误区一:**在driver上创建连接对象(比如网络连接或数据库连接)

如果在driver上创建连接对象,然后在RDD的算子函数内使用连接对象,那么就意味着需要将连接对象序列化后从driver传递到worker上。而连接对象(比如Connection对象)通常来说是不支持序列化的,此时通常会报序列化的异常(serialization errors)。因此连接对象必须在worker上创建,不要在driver上创建。

dstream.foreachRDD { rdd =>
  val connection = createNewConnection() // 数据库连接在driver上执行
  rdd.foreach { record =>
  connection.send(record) // 在worker上执行
  }
}
1
2
3
4
5
6

**误区二:**为每一条记录都创建一个连接对象

通常来说,连接对象的创建和销毁都是很消耗时间的。因此频繁地创建和销毁连接对象,可能会导致降低spark作业的整体性能和吞吐量。

dstream.foreachRDD { rdd =>
rdd.foreach { record =>
    val connection = createNewConnection() //每插入一条数据,创建一个连接
    connection.send(record)
    connection.close()
    }
}
1
2
3
4
5
6
7

比较正确的做法是:对DStream中的RDD,调用foreachPartition,对RDD中每个分区创建一个连接对象,使用一个连接对象将一个分区内的数据都写入数据库中。这样可以大大减少创建的连接对象的数量。

**正确做法一:**为每个RDD分区创建一个连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
    }
}
1
2
3
4
5
6
7

**正确做法二:**为每个RDD分区使用一个连接池中的连接对象

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
    // 从数据库连接池中获取连接
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection) // 用完以后将连接返    回给连接池,进行复用
}
}
1
2
3
4
5
6
7
8
上次更新: 2023/03/10, 16:49:38
Spark-shell读取MySQL写入HDFS
DataFrame函数

← Spark-shell读取MySQL写入HDFS DataFrame函数→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式