Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • 案例:Flume消费Kafka数据保存Hive
    • 场景
    • 创建Hive表
    • 编写Interceptor
      • 编写Flume配置文件
    • 启动Flume收集数据
    • 查看数据
  • 《Flume教程》笔记
Jast-zsh
2022-03-30
目录

案例:Flume消费Kafka数据保存Hive

# Flume消费Kafka数据保存到Hive

# 场景

通过Flume消费Kafka中数据,保存数据到ODS层,数据存储时标记消费时的元信息

# 创建Hive表

orc存储,snappy压缩,开启事务

ORC事务表

  • 只能是内部表
  • 必须创建桶
create TABLE hr.ods_internetbar_data
(
  k_topic string ,
  k_data string,
  k_partition int,
  k_offset int,
  k_key string,
  current_time bigint
)
  partitioned by (pt_dt int)
CLUSTERED BY(k_partition) into 5 buckets
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n' 
STORED AS orc 
TBLPROPERTIES('orc.compress'='SNAPPY','transactional'='true');
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 编写Interceptor

思路:我们将Kafka中的元信息(topic名称,partition等)通过Interceptor转换成以'\t'分隔的数据,然后将数据保存到Hive

拦截器com.jast.flume.ETLInterceptor代码如下:

package com.jast.flume;

import cn.hutool.json.JSONUtil;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @author Jast
 * @description 自定义拦截器
 * @date 2022-03-17 09:20
 */
public class ETLInterceptor implements Interceptor {

    private final String separator = "\t";
    private final String defaultValue = "";
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        //拦截kafka时,这里获取到的Head,是Kafka元信息,包括key,topic,partition,offset等信息,e.g. {topic=ZW_WB_1010000009, partition=0, offset=5425705, key={"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"3e9fc439e42c4059b46e5d5664d08bf8","lineNum":17,"subFullLinkId":"77dc3a9680ee4d1caeb6719d87e8d3e5","totalCount":50}, timestamp=1648642188763}
        Map<String, String> headers = event.getHeaders();
        String topic = headers.getOrDefault("topic",defaultValue);
        String partition = headers.getOrDefault("partition",defaultValue);
        String offset = headers.getOrDefault("offset",defaultValue);
        String key = headers.getOrDefault("key",defaultValue);
        long currentTimeMillis = System.currentTimeMillis();
        //System.out.println("拦截器head:"+headers);
        byte[] body = event.getBody();
        String text = new String(body, StandardCharsets.UTF_8);
        if(JSONUtil.isJson(text)){
            text = topic+separator+text+separator+partition+separator+offset+separator+key+separator+currentTimeMillis;
            System.out.println("text:"+text);
            event.setBody(text.getBytes());
            return event;
        }
        //System.out.println("非json格式过滤掉");
        return null;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        Iterator<Event> iterator = list.iterator();
        while (iterator.hasNext()){
            Event next = iterator.next();
            if(intercept(next)==null){
                iterator.remove();
            }
        }
        return list;
    }

    @Override
    public void close() {

    }


    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new ETLInterceptor();
        }
        @Override
        public void configure(Context context) {

        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.19</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

打包后生成flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar文件

将生成的包放到flume lib 目录中

  • 自己部署的Flume:

    需要先将打好的包放入到flume/lib文件夹下面

  • CDH版本Flume:

    需要先将打好的包放入到flume/lib文件夹下面

​ 具体的目录/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib/

cp flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/flume-ng/lib
1

# 编写Flume配置文件

需要注意的是a1.sinks.k1.serializer.fieldnames和a1.sources.r1.interceptors.i1.type

a1.sinks.k1.serializer.fieldnames:拦截器\t处理后在这里与Hive表的字段对应上

a1.sources.r1.interceptors.i1.type:拦截器配置

配置文件k2h.config内容如下

## 组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 每批次数量
a1.sources.r1.batchSize = 1000
# 将批写入通道之前的最长时间(毫秒)。每当达到第一个大小和时间时,将写入批。
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.60.16:9092
# kafka消费主题,多个主题逗号分隔
a1.sources.r1.kafka.topics=ZW_WB_1010000009
# 默认:PLAINTEXT,如果使用某种安全级别写入 Kafka,则设置为 SASL_PLAINTEXT、SASL_SSL 或 SSL。
a1.sources.r1.kafka.consumer.security.protocol=PLAINTEXT
# 配置拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jast.flume.ETLInterceptor$Builder

## channel1
# 基于文件存储的Channel
a1.channels.c1.type = file
# checkpoint 保存目录
a1.channels.c1.checkpointDir = /var/lib/hadoop-hdfs/flume/checkpoint/behavior1
# 用于存储日志文件的目录的逗号分隔列表。 在不同磁盘上使用多个目录可以提高文件通道性能
a1.channels.c1.dataDirs = /var/lib/hadoop-hdfs/flume/data/behavior1/
# 最大事务大小
a1.channels.c1.transactionCapacity=10000
# 通道最大容量
a1.channels.c1.capacity=1000000


a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://192.168.60.14:9083
# hive 库名
a1.sinks.k1.hive.database = hr
# hive表名称
a1.sinks.k1.hive.table = ods_internetbar_data
# 分区使用字段,转义字符介绍在文章下面
a1.sinks.k1.hive.partition = %y%m%d
# 在替换转义序列时使用本地时间(而不是事件标头中的时间戳)。
a1.sinks.k1.useLocalTimeStamp = false
# ============================================================
# e.g. 每6小时产生一个新文件,比如把24小时分成4份,假设现在的时间是15:40,如果这时候有新的日志到来,那么hdfs 会创建一个新的hdfs文件,文件名称是2015102012 ,就是15:40 是分布在12-18这个区间的
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
# collector1.sinks.sink_hdfs.hdfs.rollSize = 2048000000 
# collector1.sinks.sink_hdfs.hdfs.rollCount = 0 
# collector1.sinks.sink_hdfs.hdfs.rollInterval = 21600
# rollsize 的配置表示到2G大小的时候回滚到下一个文件,也就是到了这个时间 hdfs就会rename正在写的文件到已经写完
# rollInterval 的配置表示每个6小时回滚到下一个文件
# ============================================================


# 数据解析方式,目前支持DELIMITED(分隔符),JSON(每行为单层Json)和REGEX(正则表达式)
a1.sinks.k1.serializer = DELIMITED
# 数据字段分割符,如果要使用特殊字符需要添加双引号,例如"\t"
a1.sinks.k1.serializer.delimiter = "\t"
# 数据解析的正则表达式,每个字段的数据被解析成一个group
# a1.sinks.k1.serializer.regex = 
# (类型:字符)自定义底层 serde 使用的分隔符。 如果 serializer.fieldnames 中的字段与表列的顺序相同,serializer.delimiter 与 serializer.serdeSeparator 相同,并且 serializer.fieldnames 中的字段数小于或等于表数,则效率会有所提高 列,因为传入事件正文中的字段不需要重新排序以匹配表列的顺序。 对特殊字符使用单引号,例如“\t”。 确保输入字段不包含此字符。 注意:如果 serializer.delimiter 是单个字符,最好将其设置为相同的字符 
a1.sinks.k1.serializer.serdeSeparator = '\t'
# 输入数据字段到datahub字段的映射,以输入的顺序标示字段,如果要跳过某个字段, 不指定列名即可,例如 c1,c2,,c3,表示将输入数据的第一、二、四字段和hive的c1,c2,c3字段进行匹配。
a1.sinks.k1.serializer.fieldnames = k_topic,k_data,k_partition,k_offset,k_key,current_time
# 单个配置单元事务中写入配置单元的最大事件数,默认15000
a1.sinks.k1.batchSize=10000


## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74

上面配置中我们使用到了转义序列a1.sinks.k1.hive.partition = %y%m%d

支持的转义序列:

Alias Description
%{host} Substitute value of event header named “host”. Arbitrary header names are supported.
%t Unix time in milliseconds
%a locale’s short weekday name (Mon, Tue, ...)
%A locale’s full weekday name (Monday, Tuesday, ...)
%b locale’s short month name (Jan, Feb, ...)
%B locale’s long month name (January, February, ...)
%c locale’s date and time (Thu Mar 3 23:05:25 2005)
%d day of month (01)
%D date; same as %m/%d/%y
%H hour (00..23)
%I hour (01..12)
%j day of year (001..366)
%k hour ( 0..23)
%m month (01..12)
%M minute (00..59)
%p locale’s equivalent of am or pm
%s seconds since 1970-01-01 00:00:00 UTC
%S second (00..59)
%y last two digits of year (00..99)
%Y year (2010)
%z +hhmm numeric timezone (for example, -0400)

# 启动Flume收集数据

 flume-ng agent --conf-file k2h.config --name a1
1

# 查看数据

select k_topic,k_partition,k_offset,k_key,current_time,length(k_data) from ods_internetbar_data limit 10;
----

ZW_WB_1010000009        0       5441712 {"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":24,"subFullLinkId":"c2e3704534e745c0a779b16f059e6411","totalCount":50}    1648644001736   52105
ZW_WB_1010000009        0       5441713 {"dataType":"ZW_WB_","fileName":"yc_1644913124_11086","fullLinkId":"f3bc635822ca4af099755b9b339e8e4c","lineNum":25,"subFullLinkId":"c0cec18006ce40848ea3a0494a991999","totalCount":50}    1648644001736   35837
1
2
3
4
5

这里使用length(k_data)是为了方便展示,该字段值太大了

上次更新: 2023/03/10, 16:49:38
最近更新
01
Linux可视化监控
02-26
02
Maven私服搭建
02-26
03
当ElasticSearch时间字段设置多个格式到底是用的哪个?
01-19
更多文章>
Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式