Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • 安装Clickhouse集群
  • Clickhouse基础知识
  • Clickhouse-SQL操作
  • Clickhouse-副本
  • Clickhouse-分片集群
  • TODO-Clickhouse-Explain查看执行计划
  • Clickhouse-建表优化
  • Clickhouse-语法优化规则
  • Clickhouse-查询优化
  • Clickhouse-数据一致性
  • Clickhouse-物化视图
  • Clickhouse-使用Kafka表引擎
  • users xml配置文件详解
  • ClickHouse如何实现数据更新-ReplicatedReplacingMergeTree
    • ClickHouse-SQL使用
    • 使用ClickHouseSink写入数据
    • Mutations操作-数据的删除和修改
    • Clickhouse-每批次写入跨多个分区设置
    • 《ClickHouse教程》笔记
    Jast-zsh
    2023-03-10
    目录

    ClickHouse如何实现数据更新-ReplicatedReplacingMergeTree

    [toc]

    【摘要】 Clickhouse作为一个OLAP数据库,它对事务的支持非常有限。本文主要介绍通过ReplacingMergeTree来实现Clickhouse数据的更新、删除。

    Clickhouse作为一个OLAP数据库,它对事务的支持非常有限。

    Clickhouse提供了MUTATION操作(通过ALTER TABLE语句)来实现数据的更新、删除,但这是一种“较重”的操作,它与标准SQL语法中的UPDATE、DELETE不同,是异步执行的,对于批量数据不频繁的更新或删除比较有用,可参考https://altinity.com/blog/2018/10/16/updates-in-clickhouse。

    除了MUTATION操作,Clickhouse还可以通过CollapsingMergeTree、VersionedCollapsingMergeTree、ReplacingMergeTree结合具体业务数据结构来实现数据的更新、删除,这三种方式都通过INSERT语句插入最新的数据,新数据会“抵消”或“替换”掉老数据,但是“抵消”或“替换”都是发生在数据文件后台Merge时,也就是说,在Merge之前,新数据和老数据会同时存在。因此,我们需要在查询时做一些处理,避免查询到老数据。

    Clickhouse官方文档提供了使用CollapsingMergeTree、VersionedCollapsingMergeTree的指导,https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/collapsingmergetree/。相比于CollapsingMergeTree、VersionedCollapsingMergeTree需要标记位字段、版本字段,用ReplacingMergeTree来实现数据的更新删除会更加方便,这里着重介绍一下如何用ReplacingMergeTree来实现数据的更新删除。

    # 如何去重?

    • ReplaceMergeTree[ver] 参数:ver,版本列。版本列的类型为UInt*、Date或DateTime。可选参数。 合并的时候,ReplacingTree从所有相同主键的行中选择一行留下:如果ver未指定,选择最后一条。如果制定了ver列,选择ver值最大的版本。

    # 根据排序键去重

    建表语句

    DROP TABLE replacingMergeTreeDemo;
    CREATE TABLE replacingMergeTreeDemo
    (
        UserID UInt32,
        CounterID UInt32,
        UserName String,
        EventDate Date
    ) ENGINE = ReplacingMergeTree()
    ORDER BY (UserID, CounterID)
    PRIMARY KEY (UserID);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    如果同一order key散落到了不同的分区、不同的分片中,去重会失效;换言之,只能自动合并同一分区,同一分片的相同ID;

    # 根据排序键去重-并指定版本

    建表语句

    DROP TABLE replacingMergeTreeDemo;
    CREATE TABLE replacingMergeTreeDemo
    (
        UserID UInt32,
        CounterID UInt32,
        UserName String,
        EventDate Date
    ) ENGINE = ReplacingMergeTree(EventDate)
    ORDER BY (UserID, CounterID)
    PRIMARY KEY (UserID);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    UserID,CounterID 相同,保留EventDate最大的版本数据

    # 实际应用场景介绍

    我们假设一个需要频繁数据更新的场景,如某市用户用电量的统计,我们知道,用户的用电量每分每秒都有可能发生变化,所以会涉及到数据频繁的更新。首先,创建一张表来记录某市所有用户的用电量。

    CREATE TABLE IF NOT EXISTS default.PowerConsumption_local ON CLUSTER default_cluster
    (
        User_ID             UInt64                              COMMENT '用户ID',
        Record_Time         DateTime    DEFAULT toDateTime(0)   COMMENT '电量记录时间',
        District_Code       UInt8                               COMMENT '用户所在行政区编码',
        Address             String                              COMMENT '用户地址',
        Power               UInt64                              COMMENT '用电量',
        Deleted             BOOLEAN     DEFAULT 0               COMMENT '数据是否被删除'
    )
    ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/default.PowerConsumption_local/{shard}', '{replica}', Record_Time)
    ORDER BY (User_ID, Address)
    PARTITION BY District_Code;
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    CREATE TABLE default.PowerConsumption ON CLUSTER default_cluster AS default.PowerConsumption_local
    ENGINE = Distributed(default_cluster, default, PowerConsumption_local, rand());
    
    1
    2

    PowerConsumption_local为本地表,PowerConsumption为对应的分布式表。其中PowerConsumption_local使用ReplicatedReplacingMergeTree表引擎,第三个参数‘Record_Time’表示相同主键的多条数据,只会保留Record_Time最大的一条,我们正是利用ReplacingMergeTree的这一特性来实现数据的更新删除。因此,在选择主键时,我们需要确保主键唯一。这里我们选择(User_ID, Address)来作为主键,因为用户ID加上用户的地址可以确定唯一的一个电表,不会出现第二个相同的电表,所以对于某个电表多条数据,只会保留电量记录时间最新的一条。

    然后我们向表中插入10条数据:

    INSERT INTO default.PowerConsumption VALUES (0, '2021-10-30 12:00:00', 3, 'Yanta', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (1, '2021-10-30 12:10:00', 2, 'Beilin', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (2, '2021-10-30 12:15:00', 1, 'Weiyang', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (3, '2021-10-30 12:18:00', 1, 'Gaoxin', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (4, '2021-10-30 12:23:00', 2, 'Qujiang', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (5, '2021-10-30 12:43:00', 3, 'Baqiao', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (6, '2021-10-30 12:45:00', 1, 'Lianhu', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (7, '2021-10-30 12:46:00', 3, 'Changan', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (8, '2021-10-30 12:55:00', 1, 'Qianhan', rand64() % 1000 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (9, '2021-10-30 12:57:00', 4, 'Fengdong', rand64() % 1000 + 1, 0);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    表中数据如图所示:

    img

    假如现在我们要行政区编码为1的所有用户数据都需要更新,我们插入最新的数据:

    INSERT INTO default.PowerConsumption VALUES (2, now(), 1, 'Weiyang', rand64() % 100 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (3, now(), 1, 'Gaoxin', rand64() % 100 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (6, now(), 1, 'Lianhu', rand64() % 100 + 1, 0);
    INSERT INTO default.PowerConsumption VALUES (8, now(), 1, 'Qianhan', rand64() % 100 + 1, 0);
    
    1
    2
    3
    4

    插入最新数据后,表中数据如图所示:

    img

    可以看到,此时新插入的数据与老数据同时存在于表中,因为后台数据文件还没有进行Merge,“替换”还没有发生,这时就需要对查询语句做一些处理来过滤掉老数据,函数argMax(a, b)可以按照b的最大值取a的值,所以通过如下查询语句就可以只获取到最新数据:

    SELECT
        User_ID,
        max(Record_Time) AS R_Time,
        District_Code,
        Address,
        argMax(Power, Record_Time) AS Power,
        argMax(Deleted, Record_Time) AS Deleted
    FROM default.PowerConsumption
    GROUP BY
        User_ID,
        Address,
        District_Code
    HAVING Deleted = 0;
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    查询结果如下图:

    img

    为了更方便我们查询,这里可以创建一个视图:

    CREATE VIEW PowerConsumption_view ON CLUSTER default_cluster AS
    SELECT
        User_ID,
        max(Record_Time) AS R_Time,
        District_Code,
        Address,
        argMax(Power, Record_Time) AS Power,
        argMax(Deleted, Record_Time) AS Deleted
    FROM default.PowerConsumption
    GROUP BY
        User_ID,
        Address,
        District_Code
    HAVING Deleted = 0;
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    通过该视图,可以查询到最新的数据:

    img

    假如现在我们又需要删除用户ID为0的数据,我们需要插入一条User_ID字段为0,Deleted字段为1的数据:

    INSERT INTO default.PowerConsumption VALUES (0, now(), 3, 'Yanta', null, 1);
    
    1

    查询视图,发现User_ID为0的数据已经查询不到了:

    img

    通过如上方法,我们可以实现Clickhouse数据的更新、删除,就好像在使用OLTP数据库一样,但我们应该清楚,实际上老数据真正的删除是在数据文件Merge时发生的,只有在Merge后,老数据才会真正物理意义上的删除掉。

    # 手动触发Merge

    执行下面SQL语句也会触发Merge,删除历史数据

    optimize table msdp_main.user_only ON CLUSTER default_cluster;
    
    1

    参考文章:

    https://blog.csdn.net/lyq7269/article/details/115166036

    https://bbs.huaweicloud.com/blogs/308276

    https://zhuanlan.zhihu.com/p/429357015

    上次更新: 2023/03/10, 17:30:33
    users xml配置文件详解
    ClickHouse-SQL使用

    ← users xml配置文件详解 ClickHouse-SQL使用→

    最近更新
    01
    Linux可视化监控
    02-26
    02
    Maven私服搭建
    02-26
    03
    当ElasticSearch时间字段设置多个格式到底是用的哪个?
    01-19
    更多文章>
    Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式