Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Spark算子
  • Spark启动参数以及调优记录
  • Spark-shell读取MySQL写入HDFS
  • Spark foreachRDD的正确使用
  • DataFrame函数
  • Spark WebUI更换使用端口
  • Spark stage如何划分
  • Spark使用HanLP分词
  • Spark RDD分区2G限制
  • Spark读取Hbase写入Hive
  • Ambari Spark 提交任务报错
  • JavaAPI提交Spark任务
  • SparkStreaming Kafka 自动保存offset到zookeeper
    • 场景
    • pom.xml
  • SparkStreaming参数介绍
  • SparkKerberos租约到期
  • Spark日志Log4j发送到Kafka
  • Spark --files介绍
  • SparkGraphX使用详解
  • Spark运行异常记录
  • 《Spark教程》笔记
Jast-zsh
2022-07-07
目录

SparkStreaming Kafka 自动保存offset到zookeeper

# SparkStreaming Kafka 自动保存offset到zookeeper

# 场景

spark使用的是1.6,SparkStreaming1.6时候使用的kafka jar包为0.8的,消费时候不记录消费到的信息,导致重复消费,故手动保存到zookeeper,SparkStreaming2.1.1时使用的kafka jar包为0.10,没有出现这种状况,以下是1.6版本的消费

package com.zsh.spark.streaming

import org.I0Itec.zkclient.ZkClient
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.ZKGroupTopicDirs
import org.apache.spark.streaming.kafka.OffsetRange
import org.apache.spark.streaming.kafka.HasOffsetRanges
import kafka.utils.ZkUtils
import kafka.consumer.SimpleConsumer
import kafka.api.TopicMetadataRequest
import kafka.api.PartitionOffsetRequestInfo
import kafka.api.OffsetRequest
import java.util.Properties
import java.io.FileInputStream

object Kafka2Es {
	def main(args: Array[String]) {
		val properties = new Properties()
//				val path = Thread.currentThread().getContextClassLoader.getResource("config.properties").getPath
				val path = Kafka2Es.getClass.getResourceAsStream("/config.properties")
				properties.load(path)
				
				val sprakConf = new SparkConf().setAppName("DirectKafkaWordCountDemo")
				//此处在idea中运行时请保证local[2]核心数大于2
				sprakConf.setMaster("local[4]")
				val ssc = new StreamingContext(sprakConf, Seconds(3))

			
				val brokers = properties.getProperty("kafka.brokers") //kafka地址ip:port,ip2:port
				val zookeeper = properties.getProperty("zookeeper.node") //zookeeper地址 ip:port,ip2:port2
				val kfkHost = brokers.split(",")(0).split(":")(0) 
				val kfkPort = brokers.split(",")(0).split(":")(1).toInt
				val topics = properties.getProperty("kafka.consumer.topic") 
				val groupId = properties.getProperty("kafka.consumer.group")
				val topicSet = topics.split(",").toSet
				val kafkaParams = Map[String, String](
						"metadata.broker.list" -> brokers,
						//				    "bootstrap.servers" -> brokers,
						//						"key.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer",
						//						"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
						//						"value.deserializer" -> "org.apache.kafka.common.serialization.StringSerializer",
						"group.id" -> groupId,
						"serializer.class" -> "kafka.serializer.StringEncoder",
						"auto.offset.reset" -> "smallest"
						//	"enable.auto.commit" -> (false: java.lang.Boolean)  是否自动提交offset
						//						"enable.auto.commit" -> "true",
						//						"client.id" -> "ssss",
						//						"auto.commit.interval.ms" -> (6*1000+"")//每隔60s自动提交一次
						)
				val topicDirs = new ZKGroupTopicDirs(groupId, topics)  //创建一个 ZKGroupTopicDirs 对象,对保存
				val zkTopicPath = s"${topicDirs.consumerOffsetDir}" //获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
		println("Group Offset在zookeeper路径为:"+zkTopicPath)
		val zkClient = new ZkClient(zookeeper)//连接Zookeeper
		val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}")//查询该路径下是否子节点(默认有子节点为我们自己保存不同 partition 时生成的)
		println("children size is "+children)
		var kafkaStream : InputDStream[(String, String)] = null 
		var fromOffsets: Map[TopicAndPartition, Long] = Map()  //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
		if (children > 0) { 
		  //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
			//					for (i <- 0 to children-1) {
			//						val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
			//						val tp = TopicAndPartition(topics, i);
			//						fromOffsets += (tp -> partitionOffset.toLong)  //将不同 partition 对应的 offset 增加到 fromOffsets 中
			//						println("@@@@@@ topic[" + topics + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
			//					}
			val partitions = getPartitionLeader(topics, kfkHost, kfkPort) //获取每个partition的leader,然后取每个partition中的最小值,与zookeeper保存的最小值比较,如果zookeeper保存的比partition最小值小则使用partition的值
					partitions.foreach(x=>{
						val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${x._1}")//x._1是partitions(map)的key  x._2是value
								val tp = TopicAndPartition(topics, x._1)
								val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
								val consumerMin = new SimpleConsumer(x._2.toString(), 9092, 10000, 10000, "getMinOffset")  //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
								val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
								println("最低:"+curOffsets.head)
								var nextOffset = partitionOffset.toLong
								if (curOffsets.length > 0 && nextOffset < curOffsets.head) {  // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
									nextOffset = curOffsets.head
								}
						fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的想想
								println("ZshfromOffsets:"+fromOffsets)
					}
							)

					val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())  //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
					kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
		}
		else {
			kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
		}

		var offsetRanges = Array[OffsetRange]()
				kafkaStream.transform{ rdd =>
				offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
				rdd
		}.foreachRDD { rdd =>
		for (o <- offsetRanges) {
			val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
			ZkUtils.updatePersistentPath(zkClient, zkPath, o.fromOffset.toString)  //将该 partition 的 offset 保存到 zookeeper
			println(s"@@@@@@ topic  ${o.topic}  partition ${o.partition}  fromoffset ${o.fromOffset}  untiloffset ${o.untilOffset} #######")
		}
		rdd.foreachPartition(
				message => {
					while(message.hasNext) {
						val value=message.next()._2.toString
								println(s"@^_^@   [" + value + "] @^_^@")
					}
				}
				)
		//      println ("Zsh:"+rdd.)
		rdd.map(record=>(record._1+"%%%%"+record._2)).foreach(println)
		}






		/**8
		 * 从指定位置开始读取kakfa数据
		 * 注意:由于Exactly  Once的机制,所以任何情况下,数据只会被消费一次!
		 *      指定了开始的offset后,将会从上一次Streaming程序停止处,开始读取kafka数据
		 */
		//				val offsetList = List((topics, 0, 0L),(topics, 1, 0L),(topics, 2, 0L),(topics, 3, 0L)) 
		//				val fromOffsets = setFromOffsets(offsetList)
		//				val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message())

		//				val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String,String)](ssc, kafkaParams, fromOffsets,messageHandler )
		//				messages.foreachRDD(
		//						mess => {
		//							//获取offset集合
		//							val offsetsList = mess.asInstanceOf[HasOffsetRanges].offsetRanges
		//									mess.foreachPartition(lines => {
		//										lines.map(s=>s._1+"!!!!"+s._2).foreach(println)
		//										//        lines.foreach(line => {
		//										//          val o: OffsetRange = offsetsList(TaskContext.get.partitionId)
		//										//          println("++++++++++++++++++++++++++++++此处记录offset+++++++++++++++++++++++++++++++++++++++")
		//										//          println(s"${o.topic}  ${o.partition}  ${o.fromOffset}  ${o.untilOffset}  ${o.untilOffset} ")
		//										//          println("+++++++++++++++++++++++++++++++此处消费数据操作++++++++++++++++++++++++++++++++++++++")
		//										//          println("The kafka  line is " + line)
		//										//        })
		//									})
		//						}
		//						)
		//				messages.print()
		//				val lines = messages.map(_._2).map(s=>s+":pipade")
		//				lines.print()

		ssc.start()
		ssc.awaitTermination()
	}
	//构建Map
	def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {
			var fromOffsets: Map[TopicAndPartition, Long] = Map()
					for (offset <- list) {
						val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数
								fromOffsets += (tp -> offset._3)           // offset位置
					}
	fromOffsets
	}
	def getPartitionLeader(topic :String,kfkHost :String,kfkPort :Int):	Map[Int, String]={
			//	  	val topic_name = "test0920"     //topic_name 表示我们希望获取的 topic 名字
			val topic2 = List(topic)       
					val req = new TopicMetadataRequest(topic2, 0)
					val getLeaderConsumer = new SimpleConsumer(kfkHost, kfkPort, 10000, 10000, "OffsetLookup")  // 第一个参数是 kafka broker 的host,第二个是 port
					val res = getLeaderConsumer.send(req)
					val topicMetaOption = res.topicsMetadata.headOption
					val partitions = topicMetaOption match {
					case Some(tm) =>
					tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]  // 将结果转化为 partition -> leader 的映射关系
					case None =>
					Map[Int, String]()
			}
			partitions
	}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

# pom.xml

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka_2.10</artifactId>
	 <version>1.6.2</version>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming_2.10</artifactId>
	<version>1.6.2</version>
	<exclusions>
		<exclusion>
			<artifactId>scala-library</artifactId>
			<groupId>org.scala-lang</groupId>
		</exclusion>
	</exclusions>
</dependency>
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-core_2.10</artifactId>
	<version>1.6.2</version>
	<!-- <version>2.1.1</version> -->
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上次更新: 2023/03/10, 16:49:38
JavaAPI提交Spark任务
SparkStreaming参数介绍

← JavaAPI提交Spark任务 SparkStreaming参数介绍→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式