Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2022-07-07

Spark读取Hbase写入Hive

# Spark读取Hbase写入Hive

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession



object WeiBoAccountFilter  {



  val tableName = "crawl:weibo_user"
  val insertTableName = "crawl:weibo_user_v"
  val hBaseConf = HBaseConfiguration.create()
  val conn=ConnectionFactory.createConnection(hBaseConf)
  val hbaseTable = conn.getTable(TableName.valueOf(tableName))
  val insertHbaseTable = conn.getTable(TableName.valueOf(insertTableName))


  case class WeiboUserSchemaClass(biFollowersCount :String,city :String,created_at :String,description :String,experience :String,followers_count :String,friends_count :String,name :String,profileImageUrl :String,province :String,statuses_count :String,uid :String,url :String,verified :String,verified_reason :String,verified_type :String,verified_type_ext :String)
  def main(args: Array[String]) {

    val spark = SparkSession
      .builder()
      //      .master("local[2]")
      .appName("WeiBoAccount-Verified")
      .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .enableHiveSupport()    //如果要读取hive的表,就必须使用这个
      .getOrCreate()
    //    conf.set("spark.yarn.jars","hdfs://nameservice1/spark/jars/*.jar")
    //    conf.setMaster("local[2]")
    //    conf.setMaster("yarn-client")

    //    @transient lazy
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("INFO")
    hBaseConf.set(TableInputFormat.INPUT_TABLE,tableName)
    //    hBaseConf.set(TableInputFormat.SCAN_TIMESTAMP,"120000")
    hBaseConf.setInt("hbase.rpc.timeout", 200000)
    hBaseConf.setInt("hbase.client.operation.timeout", 200000)
    hBaseConf.setInt("hbase.client.scanner.timeout.period", 200000)
    val scan=new Scan()
    scan.withStartRow(Bytes.toBytes("0000"))
    scan.withStopRow(Bytes.toBytes("0001"))
    //scan.setCacheBlocks(false)   //为是否缓存块,默认缓存,我们分内存,缓存和磁盘,三种方式,一般数据的读取为内存->缓存->磁盘
    //scan.setBatch(100)   //为设置获取记录的列个数,默认无限制,也就是返回所有的列
    //scan.setCaching(1000) //每次从服务器端读取的行数,默认为配置文件中设置的值

    //将scan类转化成string类型
    val scan_str=convertScanToString(scan)
    hBaseConf.set(TableInputFormat.SCAN,scan_str)
    hBaseConf.set(TableInputFormat.SHUFFLE_MAPS,"true")
    //println("hbase.mapreduce.scan.timestamp:"+ hBaseConf.get("hbase.mapreduce.scan.timestamp"))
    //val rdd=sc.newAPIHadoopRDD(hBaseConf, classOf[HBaseInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    val rdd=sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable],
      classOf[Result])
    import spark.implicits._//这里的spark是上面的变量, 并非是spark带的
    val value :RDD[WeiboUserSchemaClass]= rdd.map(convertHive).filter(_!=null)

    value.foreach(println)
    val tempDS = value.repartition(10) .toDF()//转换为DataFrame,注册为表视图,转换为Dataset也可以
    //repartition数量会决定最终存入hive的文件数量与执行程序的并发数量,适当增加或减少该值,有助于性能提升
    tempDS.createTempView("test_table")//注册表视图,供sql查询
    spark.sql("desc test_table").show(false)
    val frame = spark.sql("select `name`,uid,url from test_table")
    //    frame.write.parquet("xxx") //保存至HDFS
    spark.sql("select  count(1) from test_table").show(false)
//    spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS test_table_0724(biFollowersCount String ,city String ,created_at String ,description String ,experience String ,followers_count String ,friends_count String ,name String ,profileImageUrl String ,province String ,statuses_count String ,uid String ,url String ,verified String ,verified_reason String ,verified_type String ,verified_type_ext String) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\001' STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY')")
  //写入Hive表
      spark.sql("INSERT INTO dw_crawler.test_table_0724 SELECT biFollowersCount,city,created_at,description,experience,followers_count,friends_count,name,profileImageUrl,province,statuses_count,uid,url,verified,verified_reason,verified_type,verified_type_ext FROM test_table")
    spark.sql("select count(1) from dw_crawler.test_table_0724").show(false)
    //    rdd.count
    val num = rdd.count()
    println("数量:"+ num)
//    rdd.foreach(addRow)//插入Hbase数据库

    println("关闭所有连接")
    insertHbaseTable.close()
    hbaseTable.close()
    conn.close()
  }

  var i:Int = 0
  def delete(rowkey:String): Unit ={
    val delete = new Delete (rowkey.getBytes)
    i=i+1
    hbaseTable.delete (delete)
    println("删除成功"+rowkey)
  }

  def convertScanToString(scan: Scan): String = {
    val proto = ProtobufUtil.toScan(scan)
    return Base64.encodeBytes(proto.toByteArray)
  }

  //数据解析,插入hbase表
  def addRow(tuple : (ImmutableBytesWritable,Result)): Unit ={

    val row = tuple._2.getRow
    val put = new Put(row)
    val verified_type = Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"), Bytes.toBytes("verified_type")))
    if(!verified_type.equals("-1") &&
      !verified_type.equals("220") &&
      isIntByRegex(verified_type)) {
      addColumn(put, tuple._2, "fn", "biFollowersCount")
      addColumn(put, tuple._2, "fn", "city")
      addColumn(put, tuple._2, "fn", "created_at")
      addColumn(put, tuple._2, "fn", "description")
      addColumn(put, tuple._2, "fn", "experience")
      addColumn(put, tuple._2, "fn", "followers_count")
      addColumn(put, tuple._2, "fn", "friends_count")
      addColumn(put, tuple._2, "fn", "name")
      addColumn(put, tuple._2, "fn", "profileImageUrl")
      addColumn(put, tuple._2, "fn", "province")
      addColumn(put, tuple._2, "fn", "statuses_count")
      addColumn(put, tuple._2, "fn", "uid")
      addColumn(put, tuple._2, "fn", "url")
      addColumn(put, tuple._2, "fn", "verified")
      addColumn(put, tuple._2, "fn", "verified_reason")
      addColumn(put, tuple._2, "fn", "verified_type")
      addColumn(put, tuple._2, "fn", "verified_type_ext")
      insertHbaseTable.put(put)
    }
  }


  //将hbase查询出的数据转换为Schema
  def convertHive(tuple : (ImmutableBytesWritable,Result)): WeiboUserSchemaClass ={

    val row = tuple._2.getRow
    val put = new Put(row)
    val verified_type = Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"), Bytes.toBytes("verified_type")))
    if(!verified_type.equals("-1") &&
      !verified_type.equals("220") &&
      isIntByRegex(verified_type)) {
      val biFollowersCount=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("biFollowersCount")))
      val city=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("city")))
      val created_at=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("created_at")))
      val description=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("description")))
      val experience=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("experience")))
      val followers_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("followers_count")))
      val friends_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("friends_count")))
      val name=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("name")))
      val profileImageUrl=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("profileImageUrl")))
      val province=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("province")))
      val statuses_count=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("statuses_count")))
      val uid=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("uid")))
      val url=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("url")))
      val verified=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified")))
      val verified_reason=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_reason")))
      val verified_type=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_type")))
      val verified_type_ext=Bytes.toString(tuple._2.getValue(Bytes.toBytes("fn"),Bytes.toBytes("verified_type_ext")))
      return new WeiboUserSchemaClass(biFollowersCount,city,created_at,description,experience,followers_count,friends_count,name,profileImageUrl,province,statuses_count,uid
        ,url,verified,verified_reason,verified_type,verified_type_ext)
    }
    null
  }
  def addColumn(put :Put,tuple:Result,fn :String ,column :String): Unit ={
    put.addColumn(Bytes.toBytes(fn), Bytes.toBytes(column),tuple.getValue(Bytes.toBytes(fn), Bytes.toBytes(column)))
  }



  def isIntByRegex(s : String) = {
    val pattern = """^(\d+)$""".r
    s match {
      case pattern(_*) => true
      case _ => false
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
上次更新: 2023/03/10, 16:49:38
Spark RDD分区2G限制
Ambari Spark 提交任务报错

← Spark RDD分区2G限制 Ambari Spark 提交任务报错→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式