Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Flink Standalone集群安装
  • Flink启动脚本
  • keyBy数据分配计算方法
  • Flink流处理高级操作之时间语义
  • Flink流处理之窗口Window
  • Flink流处理之ProcessFunction
    • 基本概念
      • 8个ProcessFunction
    • KeyedProcessFunction 使用案例
      • 实现功能
      • 实现代码
      • 测试执行
    • CoProcessFunction
    • BroadcastProcessFunction
  • Chain分隔
  • Backpressured详细介绍
  • Flink消费Kafka
  • Flink操作MySQL
  • Flink自定义Connector-TableApi SQL
  • Flink使用异常处理
  • FlinkCDC
  • LinkageError异常处理
  • Flink日志Log4j发送到Kafka
  • 《Flink教程》笔记
Jast-zsh
2022-04-06
目录

Flink流处理之ProcessFunction

# ProcessFunction

[toc]

# 基本概念

ProcessFunction函数是一个低级的流处理函数,可以将其看做一个具有Keyed状态和定时访问权限的FlatMapFunction,它通过调用输入数据流中收到的每个事件(元素)来处理事件(元素)。

  • 转换算子是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如我们常用的MapFunction转换算子就无法访问时间戳或者当前事件的事件时间。
  • 基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。
  • Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。

# 8个ProcessFunction

  • Flink提供了8个Process Function:
    • **1)**ProcessFunction dataStream
    • **2)**KeyedProcessFunction 用于KeyedStream,keyBy之后的流处理
    • **3)**CoProcessFunction 用于connect连接的流
    • **4)**ProcessJoinFunction 用于join流操作
    • **5)**BroadcastProcessFunction 用于广播
    • **6)**KeyedBroadcastProcessFunction keyBy之后的广播
    • **7)**ProcessWindowFunction 窗口增量聚合
    • **8)**ProcessAllWindowFunction 全窗口聚合

# KeyedProcessFunction 使用案例

# 实现功能

通过socketTextStream读取9999端口数据,统计在一定时间内不同类型商品的销售总额度,如果持续销售额度为0,则执行定时器通知老板,是不是卖某种类型商品的员工偷懒了(只做功能演示,根据个人业务来使用,比如统计UV等操作)

# 实现代码

package com.jast.flink.processfunction

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

/**
 * KeyedProcessFunction Demo
 */
object KeyedProcessFunctionDemo {
  
  def main(args: Array[String]): Unit = {
    //创建Stream环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //通过9999socket接口接收数据
    val stream: DataStream[String] = env.socketTextStream("localhost", 9999)

    //数据接收格式为:商品,价格
    //e.g. 帽子,38
    //e.g. 衣服,199
    val typeAndData: DataStream[(String, String)] = stream.map(x => (x.split(",")(0), x.split(",")(1)))
      .setParallelism(4) //设置并行度为4


    typeAndData.keyBy(0) //根据key(商品)进行聚合
      .process(new MyprocessFunction()) //调用自定义KeyedProcessFunction函数
      .print("结果") // 输出函数返回结果,前面加上"结果"
    env.execute()
  }

  /**
   * 实现:
   *    根据key分类,统计每个key进来的数据量,定期统计数量,如果数量为0则预警
   */
  class MyprocessFunction extends  KeyedProcessFunction[Tuple,(String,String),String]{

    //统计间隔时间
    val delayTime : Long = 1000 * 10

    /**
     * 状态存储变量
     * cjcount 自定义名称
     */
    lazy val state : ValueState[(String,Long)] = getRuntimeContext.getState[(String,Long)](new ValueStateDescriptor[(String, Long)]("cjcount",classOf[Tuple2[String,Long]]))

    /**
     * 定时器
     * @name onTimer
     * @date 2022/4/6 上午11:13
     * @return void
     * @param timestamp 定时器出发的时间
     * @param ctx
     * @param out
     * @author Jast
    */
    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, (String, String), String]#OnTimerContext, out: Collector[String]): Unit = {

      printf("定时器触发,时间为:%d,状态为:%s,key为:%s\n",timestamp,state.value(),ctx.getCurrentKey)
      if(state.value()._2==0){
        //该时间段数据为0,进行预警
        printf("类型为:%s,数据为0,预警\n",state.value()._1)
      }
      //定期数据统计完成后,清零
      state.update(state.value()._1,0)
      //再次注册定时器执行
      val currentTime: Long = ctx.timerService().currentProcessingTime()
      ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
    }

    /**
     * 处理数据方法
     * @name processElement
     * @date 2022/4/6 上午11:12
     * @return void
     * @param value 数据的数据值
     * @param ctx 存储的上下文信息
     * @param out 返回数据格式
     * @author Jast
    */
    override def processElement(value: (String, String), ctx: KeyedProcessFunction[Tuple, (String, String), String]#Context, out: Collector[String]): Unit = {
      printf("状态值:%s,state是否为空:%s\n",state.value(),(state.value()==null))
      if(state.value() == null){
        //当前Key首次执行状态值为空,进行初始化赋值
        //获取时间
        val currentTime: Long = ctx.timerService().currentProcessingTime()
        //注册定时器 delayTime 秒后触发(执行onTimer方法)
        ctx.timerService().registerProcessingTimeTimer(currentTime + delayTime)
        printf("定时器注册时间:%d\n",currentTime + delayTime)
        // 更新state值
        state.update(value._1,value._2.toInt)
      } else{
        //统计数据
        val key: String = state.value()._1
        var count: Long = state.value()._2
        count += value._2.toInt
        //更新state值
        state.update((key,count))
      }

      //返回任务的名称,并附加子任务指示符,例如“MyTask(3/6)”,
      println(getRuntimeContext.getTaskNameWithSubtasks+"->"+value)
      printf("状态值:%s\n",state.value())
      //返回处理后结果
      out.collect("处理后返回数据->"+value)
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113

# 测试执行

(1) 在终端启动端口准备发送数据使用

mac@Mac ~ % nc -lk 9999
1

**(2)**启动程序

**(3)**发送数据

mac@Mac ~ % nc -lk 9999
衣服,123
裤子,1234
衣服,1
1
2
3
4

**(4)**控制台输出内容

状态值:null,state是否为空:true
定时器注册时间:1649213908697
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,123)
状态值:(衣服,123)
结果:8> 处理后返回数据->(衣服,123)
状态值:null,state是否为空:true
定时器注册时间:1649213911821
KeyedProcess -> Sink: Print to Std. Out (4/8)->(裤子,1234)
状态值:(裤子,1234)
结果:4> 处理后返回数据->(裤子,1234)
状态值:(衣服,123),state是否为空:false
KeyedProcess -> Sink: Print to Std. Out (8/8)->(衣服,1)
状态值:(衣服,124)
结果:8> 处理后返回数据->(衣服,1)
定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

可以看到我们输入的内容,根据类型统计输出

定时器触发,时间为:1649213908697,状态为:(衣服,124),key为:(衣服)
定时器触发,时间为:1649213911821,状态为:(裤子,1234),key为:(裤子)
1
2

随后我们不进行数据输入,定时器触发进行预警操作

定时器触发,时间为:1649213918708,状态为:(衣服,0),key为:(衣服)
类型为:衣服,数据为0,预警
定时器触发,时间为:1649213921832,状态为:(裤子,0),key为:(裤子)
类型为:裤子,数据为0,预警
1
2
3
4

# CoProcessFunction

http://www.manongjc.com/detail/23-umgdbbcybtgvihr.html

# BroadcastProcessFunction

https://cloud.tencent.com/developer/article/1983497

上次更新: 2023/03/10, 17:00:47
Flink流处理之窗口Window
Chain分隔

← Flink流处理之窗口Window Chain分隔→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式