Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • Flink Standalone集群安装
  • Flink启动脚本
  • keyBy数据分配计算方法
  • Flink流处理高级操作之时间语义
  • Flink流处理之窗口Window
  • Flink流处理之ProcessFunction
  • Chain分隔
  • Backpressured详细介绍
  • Flink消费Kafka
  • Flink操作MySQL
    • 在function中操作MySQL读写
    • MySQL Sink保存
    • MySQL两阶段提交事务
    • Function 中 MySQL 获取数据定时更新
  • Flink自定义Connector-TableApi SQL
  • Flink使用异常处理
  • FlinkCDC
  • LinkageError异常处理
  • Flink日志Log4j发送到Kafka
  • 《Flink教程》笔记
Jast-zsh
2022-04-01
目录

Flink操作MySQL

# Flink操作MySQL

[toc]

# 在function中操作MySQL读写

e.g. 以ProcessWindowFunction举例

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.concurrent.TimeUnit
import cn.hutool.core.date.DateUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord


@SerialVersionUID(1L)
class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {

  /**
   * MySQL连接
   */
  private var connection: Connection = null


  private var ps: PreparedStatement = null

  override def open(parameters: Configuration): Unit = {

    super.open(parameters);

    //================================================================
    //获取配置文件全局变量
    val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
    //================================================================
    //MySQL连接
    val url = params.getRequired("mysql.jdbc.url")
    val username = params.getRequired("mysql.username")
    val passowrd = params.getRequired("mysql.password")
    println("MySQL连接初始化:" + url + "\t" + username + "\t" + passowrd)
    //创建MySQL连接
    connection = DriverManager.getConnection(url, username, passowrd)
    //================================================================
    //MySQL数据写入
    var pstm: PreparedStatement = null
    try {
      pstm = connection.prepareStatement("INSERT INTO alert (data_type, contrast_table) VALUES ( ?, ?) ")
      pstm.setString(1, "Test type")
      pstm.setString(2, "Test table " + DateUtil.date())
      pstm.executeUpdate
    } finally {
      if (pstm != null) {
        pstm.close()
      }
    }
    println("数据写入成功")
    //================================================================
    //读取MySQL数据
    ps = connection.prepareStatement("select data_type,contrast_table from alert")
    val resultSet = ps.executeQuery()
    while (resultSet.next()) {
      val tuple = Tuple2.apply(resultSet.getString(1), resultSet.getString(2))
      println("读取数据:" + tuple._1 + "," + tuple._2)
    }
    //================================================================

  }

  override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {


    elements.foreach(element => {
      println(element._2.topic())
      println(element._2.partition())
      println(element._2.value())
      TimeUnit.SECONDS.sleep(10);
    })
    out.collect(("facebook", 0))

  }

  override def close(): Unit = {
    super.close()
    //关闭MySQL连接
    connection.close();
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83

# MySQL Sink保存

  • MysqlSink

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//RichSinkFunction<ActivityBean> 中 ActivityBean类型为调用输入,ActivityBean自定义实体类
public class MysqlSink extends RichSinkFunction<ActivityBean> {

    private transient Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        //创建MySQL连接
        connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8", "root", "123456");

    }

    @Override
    public void invoke(ActivityBean bean, Context context) throws Exception {

        PreparedStatement pstm = null;
        try {
            pstm = connection.prepareStatement(
                    "INSERT INTO t_activity_counts (aid, event_type, counts) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE counts = ?");

            pstm.setString(1, bean.aid);
            pstm.setInt(2, bean.eventType);
            pstm.setInt(3, bean.count);
            pstm.setInt(4, bean.count);

            pstm.executeUpdate();
        } finally {
            if(pstm != null) {
                pstm.close();
            }
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接
        connection.close();
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
  • 调用

    stream.map(value => {
    var v = new ActivityBean 
    v
    }).addSink(new MysqlSink)
    
    
    
    
    1
    2
    3
    4
    5
    6
    7

# MySQL两阶段提交事务

两阶段提交事务,为了保障前一次 CheckPoint 成功后到这次 CheckPoint 成功之前这段时间内的数据不丢失,如果执行到一半过程任务失败了,从而导致前一次CheckPoint成功后到任务失败前的数据已经存储到了MySQL,然而这部分数据并没有写入到 CheckPoint。如果任务重启后,前一次CheckPoint成功后到任务失败前的数据便会再次写入MySQL,从而导致数据重复的问题。

  • MySqlTwoPhaseCommitSink

    package com.jast.flink.day07;
    
    import com.jast.flink.util.DruidConnectionPool;
    import org.apache.flink.api.common.ExecutionConfig;
    import org.apache.flink.api.common.typeutils.base.VoidSerializer;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
    import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
    
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    
    public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<Tuple2<String, Integer>, MySqlTwoPhaseCommitSink.ConnectionState, Void> {
    
    
        public MySqlTwoPhaseCommitSink() {
            super(new KryoSerializer<>(ConnectionState.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
        }
    
        @Override
        protected ConnectionState beginTransaction() throws Exception {
    
            System.out.println("=====> beginTransaction... ");
            //Class.forName("com.mysql.jdbc.Driver");
            //Connection conn = DriverManager.getConnection("jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8", "root", "123456");
            Connection connection = DruidConnectionPool.getConnection();
            connection.setAutoCommit(false);
            return new ConnectionState(connection);
    
        }
    
    
        @Override
        protected void invoke(ConnectionState connectionState, Tuple2<String, Integer> value, Context context) throws Exception {
            Connection connection = connectionState.connection;
            PreparedStatement pstm = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = ?");
            pstm.setString(1, value.f0);
            pstm.setInt(2, value.f1);
            pstm.setInt(3, value.f1);
            pstm.executeUpdate();
        }
    
    
    
        @Override
        protected void preCommit(ConnectionState connectionState) throws Exception {
            System.out.println("=====> preCommit... " + connectionState);
    
        }
    
        @Override
        protected void commit(ConnectionState connectionState) {
            System.out.println("=====> commit... ");
            Connection connection = connectionState.connection;
            try {
                connection.commit();
                connection.close();
            } catch (SQLException e) {
                throw new RuntimeException("提交事物异常");
            }
        }
    
        @Override
        protected void abort(ConnectionState connectionState) {
            System.out.println("=====> abort... ");
            Connection connection = connectionState.connection;
            try {
                connection.rollback();
                connection.close();
            } catch (SQLException e) {
                throw new RuntimeException("回滚事物异常");
            }
        }
    
        static class ConnectionState {
    
            private final transient Connection connection;
    
            ConnectionState(Connection connection) {
                this.connection = connection;
            }
    
        }
    
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
  • DruidConnectionPool

    package com.jast.flink.util;
    
    import com.alibaba.druid.pool.DruidDataSourceFactory;
    
    import javax.sql.DataSource;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.Properties;
    
    public class DruidConnectionPool {
    
        private transient static DataSource dataSource = null;
    
        private transient static Properties props = new Properties();
    
        static {
    
            props.put("driverClassName", "com.mysql.jdbc.Driver");
            props.put("url", "jdbc:mysql://172.16.200.101:3306/bigdata?characterEncoding=UTF-8");
            props.put("username", "root");
            props.put("password", "123456");
            try {
                dataSource = DruidDataSourceFactory.createDataSource(props);
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        private DruidConnectionPool() {
        }
    
        public static Connection getConnection() throws SQLException {
            return dataSource.getConnection();
        }
    
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39

# Function 中 MySQL 获取数据定时更新

满足功能:定期通过MySQL查询数据,该数据用于数据流判断时间

  • 只展示定时获取方法
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util
import java.util.Timer
import java.util.concurrent.TimeUnit

import cn.hutool.core.date.DateUtil
import com.rbt.config.Config
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.kafka.clients.consumer.ConsumerRecord

class AlertProcessWindowFunction extends ProcessWindowFunction[(Int, ConsumerRecord[String, String]), (String, Int), Int, TimeWindow] {

  /**
   * MySQL连接
   */
  private var connection: Connection = null

  private var alertMap:scala.collection.immutable.Map[String,List[(String,String,String,String)]]=null

  private var ps: PreparedStatement = null

  override def open(parameters: Configuration): Unit = {

    super.open(parameters);

    //================================================================
    //获取配置文件全局变量
    val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters.asInstanceOf[ParameterTool]
    //================================================================
    //MySQL连接
    val url = params.getRequired("mysql.jdbc.url")
    val username = params.getRequired("mysql.username")
    val passowrd = params.getRequired("mysql.password")
    println("MySQL连接初始化:" + url + "\t" + username + "\t" + passowrd)
    //创建MySQL连接
    connection = DriverManager.getConnection(url, username, passowrd)

    //MySQL查询数据定时更新,供程序使用,每隔20秒执行一次
    import java.util.concurrent.Executors
    val executor = Executors.newScheduledThreadPool(1)
    executor.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = {
        //================================================================
        //读取MySQL数据
        ps = connection.prepareStatement("select data_type,contrast_table,contrast_field,business_type from alert")
        val resultSet = ps.executeQuery()
        val list = new util.ArrayList[Tuple2[String, Tuple4[String, String, String, String]]]
        while (resultSet.next()) {
          val tuple = Tuple4.apply(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4))
          val tuple1 = Tuple2.apply(resultSet.getString(1), tuple)
          list.add(tuple1)
          println("读取数据:" + tuple._1 + "," + tuple._2)
        }
        import scala.collection.JavaConverters._
        val list1 = list.asScala.toList
        //预警条件转为Map格式后面用于判断
        alertMap = list1.map(x => (x._2._2, x._2)).groupBy(_._1).map(x=>(x._1,x._2.map(_._2)))
        println("alertMap:"+alertMap)
        //================================================================
      }
    },0,20,TimeUnit.SECONDS)
    
  }

  override def process(key: Int, context: Context, elements: Iterable[(Int, ConsumerRecord[String, String])], out: Collector[(String, Int)]): Unit = {

    out.collect(("facebook", 0))

  }

  override def close(): Unit = {
    super.close()
    //关闭MySQL连接
    connection.close();
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
上次更新: 2023/03/10, 16:49:38
Flink消费Kafka
Flink自定义Connector-TableApi SQL

← Flink消费Kafka Flink自定义Connector-TableApi SQL→

最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式