Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
    • 进入spark-shell
    • 执行代码
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2023-03-10
目录

Spark-shell读取MySQL写入HDFS

# Spark-shell读取MySQL写入HDFS

# 进入spark-shell

spark-shell \
--executor-memory 8g \
--total-executor-cores  4 \
--jars /var/lib/hadoop-hdfs/jast/test/mysql-connector-java-5.1.20.jar \
--driver-class-path /var/lib/hadoop-hdfs/jast/test/mysql-connector-java-5.1.20.jar
1
2
3
4
5

# 执行代码

//连接mysql,读取表
val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://10.248.111.11:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "tableName", "user" -> "root", "password" -> "rootpwd")).load()
mysqlDF.dtypes //查看数据类型 或者使用 mysqlDF.printSchema
mysqlDF.repartition(10).write.parquet("/test/table/sss")  //保存至hdfs
1
2
3
4

当执行 mysqlDF.dtypes或执行 mysqlDF.printSchema 时可以看见数据类型,这里的数据类型是mysql读取出来自动对应生成的类型

scala> mysqlDF.printSchema
root
 |-- accountId: long (nullable = false)
 |-- topic: string (nullable = false)
 |-- liveId: long (nullable = false)
 |-- buyCount: string (nullable = true)
 |-- goodsIndex: long (nullable = false)
 |-- itemH5TaokeUrl: string (nullable = true)
 |-- itemId: long (nullable = false)
 |-- itemName: string (nullable = true)
 |-- itemPic: string (nullable = true)
 |-- itemPrice: double (nullable = true)
 |-- itemUrl: string (nullable = true)
 |-- ishot: string (nullable = false)
 |-- bizdate: string (nullable = true)


scala> mysqlDF.dtypes
res23: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,DoubleType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

如果我们想修改其中某一列数据类型,我们可以执行以下代码,拿 itemPrice 举例,我想把double 转换成string类型

import spark.implicits._ //导入这个为了隐式转换,或RDD转DataFrame之用
import org.apache.spark.sql.types.DataTypes
var sb = mysqlDF.withColumn("itemPrice",$"itemPrice".cast(DataTypes.StringType))

//也可以使用这种方式转换
val p = people.selectExpr("cast(itemPriceas string) itemPrice_bieming","xxx","xxxx")//这里要选择你需要的列
p.printSchema()//查看结构,也可以修改成功
1
2
3
4
5
6
7

查看类型,发现已经修改成功

scala> mysqlDF.dtypes
res23: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,DoubleType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))

scala> sb.dtypes
res24: Array[(String, String)] = Array((accountId,LongType), (topic,StringType), (liveId,LongType), (buyCount,StringType), (goodsIndex,LongType), (itemH5TaokeUrl,StringType), (itemId,LongType), (itemName,StringType), (itemPic,StringType), (itemPrice,StringType), (itemUrl,StringType), (ishot,StringType), (bizdate,StringType))
1
2
3
4
5

将数据写入hive表,接着上面的sb变量写入

//将数据写入hdfs,注意目录使用临时目录
sb.repartition(10).write.parquet("/test/db_test/tableName_temp/date=test")
//将数据load到数据表,上面数据目录使用临时目录,load后数据会进入正式表数据目录
sql("load data inpath 'hdfs://nameservice1/test/db_test/tableName_temp/date=test'  into table source_taobao_live_product_now_new")  
//查看数据
sql("select * from source_taobao_live_product_now_new limit 1").show
1
2
3
4
5
6
上次更新: 2023/03/10, 17:30:33
Spark启动参数以及调优记录
Spark foreachRDD的正确使用

← Spark启动参数以及调优记录 Spark foreachRDD的正确使用→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式