【flink实战】flink-connector-mysql-cdc导致mysql连接器报类型转换错误
文章目录
- 一. 报错现象
- 二. 方案二:重新编译打包flink-connector-cdc
- 1. 排查脚本
- 2. 重新编译打包flink-sql-connector-mysql-cdc-2.4.0.jar
- 3. 测试flink环境
- 三. 方案一:改造flink连接器
一. 报错现象
flink sql任务是:mysql到hdfs的离线任务,flink在消费mysql时报如上错误。
根据经验:mysql 8.x版本会将Timestamp数据类型转换为localdatetime类型,而flink 连接器中并未做此适配,导致任务消费数据后,类型转换报错。
解决方案有两种:
- flink 连接器兼容mysql 8.x,
- 找到mysql 8.x驱动所在的连接器,去掉mysql 8.x驱动。
这里先尝试使用第二种方案。
二. 方案二:重新编译打包flink-connector-cdc
1. 排查脚本
在flink lib目录下查找含有mysql8驱动的jar
#!/usr/bin/env bash ls | while read one_line do class_name=$(jar -vtf $one_line |grep 'com/mysql/cj/jdbc/Driver.class') if [[ ${class_name}x != "x" ]]; then echo "jar:$one_line contains the ${class_name}" fi done
bash check_driver.sh java.util.zip.ZipException: error in opening zip file at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.(ZipFile.java:219) at java.util.zip.ZipFile.(ZipFile.java:149) at java.util.zip.ZipFile.(ZipFile.java:120) at sun.tools.jar.Main.list(Main.java:1115) at sun.tools.jar.Main.run(Main.java:293) at sun.tools.jar.Main.main(Main.java:1288) jar:flink-sql-connector-mysql-cdc-2.4.0.jar contains the 730 Thu Dec 16 00:25:38 CST 2021 com/mysql/cj/jdbc/Driver.class java.util.zip.ZipException: zip file is empty at java.util.zip.ZipFile.open(Native Method) at java.util.zip.ZipFile.(ZipFile.java:219) at java.util.zip.ZipFile.(ZipFile.java:149) at java.util.zip.ZipFile.(ZipFile.java:120) at sun.tools.jar.Main.list(Main.java:1115) at sun.tools.jar.Main.run(Main.java:293) at sun.tools.jar.Main.main(Main.java:1288)
发现只有flink-sql-connector-mysql-cdc-2.4.0 jar含有mysql8.x版本的驱动。
2. 重新编译打包flink-sql-connector-mysql-cdc-2.4.0.jar
修改方式如下
3. 测试flink环境
经过重新编译打包后的flink-sql-connector-mysql-cdc-2.4.0.jar中就不包含mysql8.x版本的驱动了,又因为提交任务时,会加载flink lib下所有的jar,故保证此目录下有mysql5.x的包,但不包含mysql8.x即可。
现测试包含mysql5.x的驱动 mysql cdc的任务是否能够正常启动。
测试,报无法初始化MySqlConnectorConfig
单独添加mysql-connector-java-8.0.28.jar到flink lib后运行正常,说明此版本驱动是必要的。
三. 方案一:改造flink连接器
再来关注下一开始的报错堆栈信息:
报错的位置在SqlConverter,没有兼容mysql 8.x的驱动,这里兼容也比较简单:
具体分析原因也可见我之前的文章:
【源码改造】flink JDBC connector 源码改造之 类型转换 java.time.LocalDateTime cannot be cast to java.sql.Timestamp
在连接器中添加对LocalDateTime数据类型的适配,
case TIMESTAMP_WITH_TIME_ZONE: case TIMESTAMP_WITHOUT_TIME_ZONE: return val -> { if (val instanceof LocalDateTime) { return TimestampData.fromTimestamp(Timestamp.valueOf((LocalDateTime) val)); } return TimestampData.fromTimestamp((Timestamp) val); };
这里还需要一点,因为使用的是chunjun的连接器,mysql连接器依赖chunjun-connector-jdbc-base模块,具体的converter也由此模块实现,如果修改此模块,其他依赖此模块的连接器也需要重新打包上传,所以这里需要实现mysql的sqlconverter,以最小化修改的方式修改。
具体见我提到chunjun的pr:
[Feature-#1899][connector][mysql] The connector supports MySQL Driver 8.x #1900