Jast blog Jast blog
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档

Jast-zsh

如果你知道你要去哪里,全世界都会给你让路。
首页
  • 《Ambari自定义开发教程》笔记
  • 《CDH教程》笔记
  • 《ClickHouse教程》笔记
  • 《HDFS教程》笔记
  • 《DolphinScheduler教程》笔记
  • 《Hbase教程》笔记
  • 《Iceberg教程》笔记
  • 《Hive教程》笔记
  • 《Flume教程》笔记
  • 《Kafka教程》笔记
  • 《Impala教程》笔记
  • 《Hue教程》笔记
  • 《Spark教程》笔记
  • 《Flink教程》笔记
  • 《Phoenix教程》笔记
  • 《ElasticSearch教程》笔记
  • 《Kylin教程》笔记
  • 《Storm教程》笔记
  • 《Yarn教程》笔记
  • 《Presto教程》笔记
  • 《图数据库教程》笔记
  • 《Kerberos教程》笔记
  • 《Maxwell教程》笔记
  • 《MinIO教程》笔记
  • 《DataX教程》笔记
  • 《Superset教程》笔记
  • 《IOTDB教程》笔记
  • 《大数据相关》笔记
  • 《PaddleNLP教程》笔记
  • 《Nginx教程》笔记
  • 《Java技术文档》
  • 《Maven教程》笔记
  • 《IDEA使用教程》
  • 学习
  • 面试
  • 心情杂货
  • 实用技巧
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
  • FlinkClient使用Iceberg
  • Iceberg基于Hadoop存储数据格式介绍
  • Kafka数据写入Iceberg
  • Flink代码读写Iceberg
    • 数据湖Iceberg-简介(1)
    • 数据湖Iceberg-存储结构(2)
    • 数据湖Iceberg-Hive集成Iceberg(3)
    • 数据湖Iceberg-SparkSQL集成(4)
    • 数据湖Iceberg-FlinkSQL集成(5)
    • 数据湖Iceberg-FlinkSQL-kafka类型表数据无法成功写入(6)
    • 数据湖Iceberg-Flink DataFrame集成(7)
    • 《Iceberg教程》笔记
    Jast-zsh
    2023-03-10
    目录

    Flink代码读写Iceberg

    # 创建基础信息

    CREATE CATALOG hadoop_catalog WITH (
      'type'='iceberg',
      'catalog-type'='hadoop',
      'warehouse'='hdfs:///user/hive/warehouse/iceberg_hadoop_catalog',
      'property-version'='1'
    );
    
    1
    2
    3
    4
    5
    6

    创建数据库

     create database hadoop_catalog.iceberg_db;
    
    1

    创建表

    CREATE TABLE `hadoop_catalog`.`iceberg_db`.`flink_read_1` (
    id BIGINT COMMENT 'unique id',
    data STRING
    );
    
    1
    2
    3
    4

    写入数据

    INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (1, 'a');
    INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (2, 'a');
    INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (3, 'a');
    INSERT INTO `hadoop_catalog`.`iceberg_db`.`flink_read_1` VALUES (4, 'a');
    
    1
    2
    3
    4

    # Flink追加写入数据

    创建写入表

    create table `hadoop_catalog`.`iceberg_db`.`flink_write_1` like `hadoop_catalog`.`iceberg_db`.`flink_read_1`;
    
    1

    # Flink覆盖写入数据

    # 异常

    # 异常一

    Caused by: java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (265524090384035565, 3858118682089226033]
            at org.apache.iceberg.IncrementalDataTableScan.snapshotsWithin(IncrementalDataTableScan.java:121)
            at org.apache.iceberg.IncrementalDataTableScan.planFiles(IncrementalDataTableScan.java:73)
            at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:204)
            at org.apache.iceberg.DataTableScan.planTasks(DataTableScan.java:30)
            at org.apache.iceberg.flink.source.FlinkSplitPlanner.planTasks(FlinkSplitPlanner.java:109)
            at org.apache.iceberg.flink.source.FlinkSplitPlanner.planInputSplits(FlinkSplitPlanner.java:41)
            at org.apache.iceberg.flink.source.StreamingMonitorFunction.monitorAndForwardSplits(StreamingMonitorFunction.java:143)
            at org.apache.iceberg.flink.source.StreamingMonitorFunction.run(StreamingMonitorFunction.java:121)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
            at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
            at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

    报错原因

    Flink还不能以 Streaming 的方式读取 Iceberg 的增量update/delete数据。

    上次更新: 2023/03/10, 20:58:04
    Kafka数据写入Iceberg
    数据湖Iceberg-简介(1)

    ← Kafka数据写入Iceberg 数据湖Iceberg-简介(1)→

    最近更新
    01
    Linux可视化监控
    02-26
    02
    Maven私服搭建
    02-26
    03
    当ElasticSearch时间字段设置多个格式到底是用的哪个?
    01-19
    更多文章>
    Theme by Vdoing | Copyright © 2016-2025 Jast-zsh | MIT License
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式