Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Flink Standalone集群安装
  • Flink启动脚本
  • keyBy数据分配计算方法
  • Flink流处理高级操作之时间语义
  • Flink流处理之窗口Window
  • Flink流处理之ProcessFunction
  • Chain分隔
  • Backpressured详细介绍
  • Flink消费Kafka
    • 消费Kafka数据
      • 代码结构
      • Kafka消费工具类
      • Kafka自定义反序列化类
      • 消费处理类
      • 配置文件
      • 启动类
    • 异常处理
      • NullPointerException KafkaPartitionDiscoverer.getAllPartitionsForTopics
  • Flink操作MySQL
  • Flink自定义Connector-TableApi SQL
  • Flink使用异常处理
  • FlinkCDC
  • LinkageError异常处理
  • Flink日志Log4j发送到Kafka
  • 《Flink教程》笔记
Jast-zsh
2022-03-31
目录

Flink消费Kafka

# Flink消费Kafka

[toc]

# 消费Kafka数据

下面代码主要实现功能:

  1. 消费Kafka数据只获取消费数据
  2. 消费Kafka数据获取Kafka数据和Kafka数据的元数据(topic,partition,offset,key等)

# 代码结构

类名 说明
FlinkUtilsScala 创建 Kafka Stream 工具类
JastKafkaDeserializationSchema Kafka自定义反序列化类,用于返回Kafka消费详细信息,如:key,partition,offset等
AlertProcessWindowFunction 消费处理类,用于消费Kafka数据,进行具体处理使用
AlertApplication 启动类

# Kafka消费工具类

package com.rbt.util

import java.util.Properties

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerConfig

/**
 * @description 创建 Kafka Stream 工具类
 * @author Jast
*/
class FlinkUtilsScala {

  /**
   * 消费时返回结果只有数据,无其他元信息
   * @name createKafkaStream
   * @return scala.Function1<org.apache.flink.api.common.typeinfo.TypeInformation<T>,org.apache.flink.streaming.api.scala.DataStream<T>> 
   * @param env
   * @param parameters
   * @param clazz
   * @param topic
   * @author Jast
  */
  @throws[Exception]
  def createKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool,
                                            clazz: Class[_ <: DeserializationSchema[T]]
                                            , topic: String) = { //设置全局的参数
    val (props: Properties, list: _root_.java.util.ArrayList[_root_.scala.Predef.String]) = commonCreateKafkaStream(env, parameters, topic)
    //KafkaSource
    val kafkaConsumer: FlinkKafkaConsumer[T] = new FlinkKafkaConsumer[T](list, clazz.newInstance(), props)
    //默认开启,checkpoint成功后提交offset到kafka内部,仅供监控使用,该值存在误差
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
    env.addSource(kafkaConsumer)
  }


  /**
   * 消费时返回结果有元信息的方法,返回对象为:ConsumerRecord,该方法传入的为自定义Kafka序列化类,该序列继承KafkaDeserializationSchema
   * @name createKafkaStream
   * @return scala.Function1<org.apache.flink.api.common.typeinfo.TypeInformation<T>,org.apache.flink.streaming.api.scala.DataStream<T>>
   * @param env
   * @param parameters
   * @param clazz
   * @param topic
   * @author Jast
  */
  @throws[Exception]
  def createMetaStoreKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool,
                                            clazz: Class[_ <: KafkaDeserializationSchema[T]]
                                            , topic: String) = { //设置全局的参数
    val (props: Properties, list: _root_.java.util.ArrayList[_root_.scala.Predef.String]) = commonCreateKafkaStream(env, parameters, topic)
    //KafkaSource
    val kafkaConsumer: FlinkKafkaConsumer[T] = new FlinkKafkaConsumer[T](list, clazz.newInstance(), props)
    //默认开启,checkpoint成功后提交offset到kafka内部,仅供监控使用,该值存在误差
    kafkaConsumer.setCommitOffsetsOnCheckpoints(true)
    env.addSource(kafkaConsumer)
  }

  /**
   * 创建Kafka Stream通用方法
   * @param env
   * @param parameters
   * @param topic
   * @tparam T
   * @return
   */
  private def commonCreateKafkaStream[T: TypeInformation](env: StreamExecutionEnvironment, parameters: ParameterTool, topic: String) = {
    env.getConfig.setGlobalJobParameters(parameters)

    //开启Checkpointing,同时开启重启策略
    env.enableCheckpointing(parameters.getLong("checkpoint.interval", 5000L), CheckpointingMode.EXACTLY_ONCE)

    //取消任务,checkpoint不删除
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

    //设置异常重启次数与重启间隔时间 restart.attempts   delay.between.attempts
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      parameters.getInt("restart.attempts", 5),
      parameters.getInt("delay.between.attempts", 3000)))

    val props = new Properties
    //指定Kafka的Broker地址
    props.setProperty("bootstrap.servers", parameters.getRequired("kafka.bootstrap.servers"))
    //指定组ID
    props.setProperty("group.id", parameters.getRequired("kafka.group.id"))
    //如果没有记录偏移量,第一次从最开始消费
    props.setProperty("auto.offset.reset", parameters.get("kafka.auto.offset.reset", "earliest"))
    //kafka的消费者不自动提交偏移量
    props.setProperty("enable.auto.commit", parameters.get("kafka.enable.auto.commit", "false"))

    props.setProperty("max.poll.records", parameters.get("max.poll.records", "1000"))

    props.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "600000");

    val topics: String = topic
    val list = new java.util.ArrayList[String]
    topics.split(",").foreach(list.add)
    (props, list)
  }
}

object FlinkUtilsScala {

  def apply(): FlinkUtilsScala = new FlinkUtilsScala()

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

# Kafka自定义反序列化类

package com.jast.schema


import java.nio.charset.StandardCharsets

import org.apache.flink.api.common.typeinfo.TypeHint
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord

/**
 * @description 自定义Kafka序列化类,用于获取Kafka元数据
 * @author Jast
 */
class JastKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {

  override def isEndOfStream(nextElement: ConsumerRecord[String, String]) = false

  @throws[Exception]
  override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
    //将Kafka消费的详细信息返回 ,返回类型为ConsumerRecord[String,String]
    new ConsumerRecord(
      record.topic,
      record.partition,
      record.offset,
      record.timestamp,
      record.timestampType,
      record.checksum,
      record.serializedKeySize,
      record.serializedValueSize,
      if (record.key() == null) "" else new String(record.key, StandardCharsets.UTF_8),
      if (record.value() == null) "" else new String(record.value, StandardCharsets.UTF_8))
  }

  /**
   * 用于获取反序列化对象的类型
   *
   * @return
   */
  override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# 消费处理类

import java.util.concurrent.TimeUnit
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord

/**
 * 布控预警判断
 */
@SerialVersionUID(1L)
class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {

  override def open(parameters: Configuration): Unit = {

  }

  override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {

    elements.foreach(element=>{
      //输出Kafka消费的详细信息
      println(element._2.topic())
      println(element._2.partition())
      println(element._2.value())
      TimeUnit.SECONDS.sleep(10);
    })
    out.collect(("test", 0))

  }

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 配置文件

  • alert.properties
# 消费Kafka
kafka.topics.alert=WB_1020000001,ZW_WB_1010000005,ZW_WB_1010000004,ZW_WB_1010000003,ZW_WB_1020000002,ZW_WB_1010000009
#kafka.topics.alert=WB_1020000001
#消费者group
kafka.group.id=test2
#kafka borkers
kafka.bootstrap.servers=192.168.60.16:9092
#第一次消费从哪里消费,默认earliest
kafka.auto.offset.reset=earliest
#是否自动提交,默认false
kafka.enable.auto.commit=false
#HugeGraph批量写入时间
flink.batch.time.milliseconds=1000
#定时统计数量
flink.count.time.milliseconds=10000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 启动类

需要获取详细信息则需要使用:

val stream: DataStream[ConsumerRecord[String, String]] = FlinkUtilsScala.apply()
        .createMetaStoreKafkaStream(env, tool, classOf[JastKafkaDeserializationSchema], kafkaTopics)
1
2

如果只需要获取Kafka数据则使用

val stream: DataStream[String] = FlinkUtilsScala.apply()
        .createKafkaStream(env, tool, classOf[SimpleStringSchema], waSourceFj1001)
1
2

详细代码:


import java.io.File
import cn.hutool.core.io.resource.{Resource, ResourceUtil}
import cn.hutool.core.util.StrUtil
import cn.hutool.system.SystemUtil
import com.jast.function.AlertProcessWindowFunction
import com.jast.schema.JastKafkaDeserializationSchema
import com.jast.util.FlinkUtilsScala
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.kafka.clients.consumer.ConsumerRecord


object AlertApplication {
  //配置文件
  private val envName = "alert.properties"

  def main(args: Array[String]): Unit = {
    //windows、mac系统默认为本地开发调试环境
    val isLocal: Boolean = SystemUtil.getOsInfo().isMac || SystemUtil.getOsInfo().isWindows

    //参数工具类
    var tool: ParameterTool = null

    if (isLocal) {
      //开发环境读取resouces目录下配置文件
      val resource: Resource = ResourceUtil.getResourceObj(envName)
      println("读取配置文件:" + resource.getUrl.getPath)
      tool = ParameterTool.fromPropertiesFile(resource.getUrl.getPath)
    } else {
      //linux环境读取conf目录下的配置文件
      println("读取配置文件:" + System.getProperty("user.dir") + File.separator + "conf" + File.separator + envName)
      tool = ParameterTool.fromPropertiesFile(System.getProperty("user.dir") + File.separator + "conf" + File.separator + envName)
    }
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    if (isLocal) {
      //设置windowsStateBackEnd
      env.setStateBackend(new FsStateBackend("file:////Users/mac/IdeaProjects/huairou-bigdata/state-backend"))
    }

    val kafkaTopics = tool.get("kafka.topics.alert")
    if (!StrUtil.isBlankIfStr(kafkaTopics)) {
      val stream: DataStream[ConsumerRecord[String, String]] = FlinkUtilsScala.apply()
        .createMetaStoreKafkaStream(env, tool, classOf[JastKafkaDeserializationSchema], kafkaTopics)
      stream.map(value => (_root_.scala.util.Random.nextInt(10), value))
        .keyBy(_._1)
        .timeWindow(Time.milliseconds(tool.getLong("flink.batch.time.milliseconds")))
        .process(new AlertProcessWindowFunction).name("alert process")
    }

    env.execute
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57

# 异常处理

# NullPointerException KafkaPartitionDiscoverer.getAllPartitionsForTopics

java.lang.NullPointerException
	at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:507)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)
1
2
3
4
5
6
7
8
9
10

原因:当前消费的Topic在Kafka中没有。

解决方法:创建该Topic

上次更新: 2023/03/10, 16:49:38
Backpressured详细介绍
Flink操作MySQL

← Backpressured详细介绍 Flink操作MySQL→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式