代码编织梦想

问题

我使用flink cdc同步mysql到mysql遇到了timestamp字段缺少八小时的问题。很少无语,flink ,cdc,debezium时区都设置了,没有任何效果!

分析

问题出现在mysql binlog身上!!!
因为默认mysql会使用UTC来存储binlog,你可以使用下方的sql验证:

mysqlbinlog --base64-output=DECODE-ROWS -v --start-datetime="2024-11-26 16:20:59" --stop-datetime="2024-11-26 16:30:59" /path/to/binlog-file

设置起止时间和binlog位置

而我们存储mysql数据的时候使用的时区大概率是上海,你也可以查看:

SELECT @@global.time_zone, @@session.time_zone;

如果都是system,而你就在中国大陆那就没错了

验证分析:

而这个时间相差你会发现只在同步增量数据的时候才出现!因为.startupOptions(StartupOptions.initial())会同步历史数据,这些都是从数据库读取的,所以两边都是上海时区就不会有问题!

解决

其实官方给了解决的方案,但是说的非常的模糊,如果对flink cdc不是很熟悉的朋友大概率会云里雾里!
这是官方的常见问题汇总:
以下是原话:
Q2: 使用 MySQL CDC,增量阶段读取出来的 timestamp 字段时区相差8小时,怎么回事呢?

#在解析binlog数据中的timestamp字段时,cdc 会使用到作业里配置的server-time-zone信息,也就是MySQL服务器的时区,如果这个时区没有和你的MySQL服务器时区一致,就会出现这个问题。

此外,如果是在DataStream作业中自定义列化器如 MyDeserializer implements DebeziumDeserializationSchema, 自定义的序列化器在解析 timestamp 类型数据时,需要参考下 RowDataDebeziumDeserializeSchema 中对 timestamp 类型的解析,用时给定的时区信息。


private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
        if (dbzObj instanceof Long) {
            switch (schema.name()) {
                case Timestamp.SCHEMA_NAME:
                    return TimestampData.fromEpochMillis((Long) dbzObj);
                case MicroTimestamp.SCHEMA_NAME:
                    long micro = (long) dbzObj;
                    return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
                case NanoTimestamp.SCHEMA_NAME:
                    long nano = (long) dbzObj;
                    return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
            }
        }
        LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
        return TimestampData.fromLocalDateTime(localDateTime);
    }

其实意思很简单,就是自定义序列化器(实现接口CustomConverter)并且对timestamp字段进行单独的处理就可以!

总结!!!

关于序列化,flink有一个官方的序列化器,是debezium的,源码下载链接:
在这里插入图片描述
你只需要在这个方法里面手动修改时区就可以了!
在这里插入图片描述

注意:你要观察下按照你的环境版本timestamp字段映射的对象类型是不是 ZonedDateTime!!!

使用也很简单(好人做到底):

MySqlSource<DataChangeInfo> mySqlSource = MySqlSource.<DataChangeInfo>builder()
                .hostname("192.168.10.14")
                .port(3306)
                .databaseList("xcode")
                .tableList("xcode.temp_flink")
                .username("root")
                .password("123456")
//                .serverId("5401-5404")
                .debeziumProperties(getProperties())
                .deserializer(new MysqlDeserialization())
//                .scanNewlyAddedTableEnabled(true)
//                .includeSchemaChanges(true) // Configure here and output DDL events
                .startupOptions(StartupOptions.initial())
//                .serverTimeZone("Asia/Shanghai")
                .build();

// 关键代码在这里!!!!!!!!!
private static Properties getProperties() {
    Properties properties = new Properties();
    properties.setProperty("converters", "dateConverters");
    //这里!这里!!这里!!!(这是官方的,用上面的源码自己修改完填你的全路径)
    properties.setProperty("dateConverters.type", "io.debezium.connector.mysql.converters.MysqlDebeziumTimeConverter");
    properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
    properties.setProperty("dateConverters.format.time", "HH:mm:ss");
    properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
    properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
    // timestamp没用。。。
//        properties.setProperty("dateConverters.format.timestamp.zone", "UTC");
    //全局读写锁,可能会影响在线业务,跳过锁设置
    properties.setProperty("debezium.snapshot.locking.mode","none");
    properties.setProperty("include.schema.changes", "true");
    properties.setProperty("bigint.unsigned.handling.mode","long");
    properties.setProperty("decimal.handling.mode","double");
    return properties;
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/JGMa_TiMo/article/details/144062951

Flink SQL CDC 的实时增量同步数据-爱代码爱编程

问题导读:1、怎样实现基于 Flink SQL CDC 的数据同步方案? 2、CDC 是否需要保证顺序化消费? 3、GROUP BY 结果如何写到 Kafka ? 传统的数据同步方案与 Flink SQL CDC 解决方案业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据

flink-cdc实时增量同步mysql数据到hbase_大数据技术派的博客-爱代码爱编程

本文首发于我的个人博客网站 等待下一个秋-Flink 什么是CDC? CDC是(Change Data Capture 变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据 或 数据表的插入INS

flink系列之:基于flink cdc2.0实现海量数据的实时同步和转换_勇敢羊羊在飞奔的博客-爱代码爱编程

Flink系列之:基于Flink CDC2.0实现海量数据的实时同步和转换 一、CDC技术 二、Flink CDC技术 三、传统数据集成方案的痛点 1.传统数据入仓架构1.0

【大数据】flink cdc 实时同步mysql数据_flink cdc mysql-爱代码爱编程

目录 一、前言 二、Flink CDC介绍 2.1 什么是Flink CDC 2.2 Flink CDC 特点 2.3 Flink CDC 核心工作原理 2.4 Flink CDC 使用场景 三、常用的数据同步方案对比 3.1 数据同步概述 3.1.1 数据同步来源 3.2 常用的数据同步方案汇总 3.3 为什么推荐Flink C

【微服务】springboot3 集成 flink cdc 1.17 实现mysql数据同步_springboot集成flink-爱代码爱编程

目录 一、前言 二、常用的数据同步解决方案 2.1 为什么需要数据同步 2.2 常用的数据同步方案 2.2.1 Debezium 2.2.2 DataX 2.2.3 Canal 2.2.4 Sqoop 2.2.5 Kettle 2.2.6 Flink CDC 三、Flink CDC介绍 3.1 Flink CDC 概述 3.1

flink学习(1)——standalone模式的安装-爱代码爱编程

1、上传,解压,重命名,配置环境变量 将文件上传到/opt/modules下 cd /opt/modules tar -zxf flink-1.13.6-bin-scala_2.11.tgz -C /opt/installs/ mv flink-1.13.6/ flink vi /etc/profile export FLINK_HOME=/opt

flink sink的使用_flink addsink kafka-爱代码爱编程

经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台. flink在批处理中常见的sink print 打印writerAsText 以文本格式输出

flink中普通api的使用-爱代码爱编程

本篇文章从Source、Transformation(转换因子)、sink这三个地方进行讲解 Source: 创建DataStream本地文件SocketKafka Transformation(转换因子): mapFlatMapFilterKeyByReduceUnion和connectSide Outputs sink: print 打印w

flink学习(8)——窗口函数-爱代码爱编程

增量聚合函数 ——指窗口每进入一条数据就计算一次 例如:要计算数字之和,进去一个12 计算结果为20, 再进入一个7 ——结果为27  reduce aggregate(aggregateFunction) package com.bigdata.day04; public class _04_agg函数 { public stat

flink四大基石之window-爱代码爱编程

Window  Flink 认为 Batch 是 Streaming 的一个特例,所以Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完善的窗口机制。 为什么需要Window? 在流处理应用中,数据是连续不断的,有时我们需要做一

flink standalone 集群模式安装部署教程-爱代码爱编程

目录 一、前言 二、环境准备 三、安装步骤 1. 下载并安装 Flink 4. 配置 Flink 5. 配置环境变量 6. 启动 Flink 集群 7. 访问 Flink Web 界面 四、简单测试 五、常见问题和解决办法 1. 启动失败,无法连接到 TaskManager 2. Web 界面无法访问 六、总结 一、前言

flink学习连载文章8-爱代码爱编程

Time的分类 (时间语义) EventTime:事件(数据)时间,是事件/数据真真正正发生时/产生时的时间 IngestionTime:摄入时间,是事件/数据到达流处理系统的时间 ProcessingTime:处理时间,是事件/数据被处理/计算时的系统的时间 EventTime的重要性 假设,你正在去往地下停车场的路上,并且打算用手