Flink CDC的使用
MySQL数据准备
create database if not exists test; use test; drop table if exists stu; create table stu (id int primary key auto_increment, name varchar(100), age int); insert into stu(name, age) values("张三",18); insert into stu(name, age) values("李四",20); insert into stu(name, age) values("王五",21);
注意:表必须有主键
(图片来源网络,侵删)
开启MySQL binlog
修改MySQL配置,开启binlog
$ sudo vim /etc/my.cnf,添加如下设置
server-id = 1 log-bin=mysql-bin binlog_format=row binlog-do-db=test
注意:启用binlog的数据库,需根据实际情况作出修改
重启mysql
$ sudo systemctl restart mysqld
代码开发
依赖
Flink CDC依赖
com.ververica flink-connector-mysql-cdc 2.4.0 org.apache.flink flink-table-api-java-bridge ${flink.version}
完整依赖
8 8 UTF-8 1.17.1 2.4.0 org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} org.apache.flink flink-table-api-java-bridge ${flink.version} org.apache.flink flink-table-planner-loader ${flink.version} org.apache.flink flink-table-runtime ${flink.version} org.apache.flink flink-connector-files ${flink.version} mysql mysql-connector-java 8.0.33 org.apache.flink flink-connector-jdbc 1.17-SNAPSHOT org.apache.flink flink-connector-files ${flink.version} provided org.apache.flink flink-connector-datagen ${flink.version} org.apache.flink flink-statebackend-rocksdb ${flink.version} org.apache.hadoop hadoop-client 3.3.4 org.apache.flink flink-connector-kafka ${flink.version} org.apache.flink flink-statebackend-changelog ${flink.version} runtime com.google.code.findbugs jsr305 1.3.9 org.apache.flink flink-table-api-java-bridge ${flink.version} org.apache.flink flink-table-planner-loader ${flink.version} org.apache.flink flink-table-runtime ${flink.version} org.apache.flink flink-connector-files ${flink.version} com.ververica flink-connector-mysql-cdc ${flink-cdc.vesion}
Flink代码
Flink CDC捕获MySQL变更数据(增加、修改、删除),输出到控制台。
import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkCDCDemo { public static void main(String[] args) throws Exception { // 环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); // 数据源 MySqlSource mySqlSource = MySqlSource.builder() .hostname("node4") .port(3306) .username("root") .password("000000") .databaseList("test") .tableList("test.stu") .deserializer(new JsonDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); DataStreamSource dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_source").setParallelism(1); // 处理数据 // 输出数据 dataStreamSource.print(); // 执行 env.execute(); } }
运行程序,确保程序无报错,看到如下输出:
18:58:51,826 INFO io.debezium.connector.mysql.MySqlStreamingChangeEventSource - Keepalive thread is running
测试
添加数据
mysql添加数据
mysql> insert into stu(name, age) values("赵六",23);
IDEA控制台输出
{"before":null,"after":{"id":4,"name":"赵六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}
格式化输出
{ "before": null, "after": { "id": 4, "name": "赵六", "age": 23 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719831654000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2300, "row": 0, "thread": 13, "query": null }, "op": "c", "ts_ms": 1719831654692, "transaction": null }
关注before、after符合增加数据的逻辑,op为c表示添加数据
修改数据
mysql修改数据
mysql> update stu set name="zl", age=19 where name="赵六";
IDEA控制台输出
{"before":{"id":4,"name":"赵六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}
格式化输出
{ "before": { "id": 4, "name": "赵六", "age": 23 }, "after": { "id": 4, "name": "zl", "age": 19 }, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719831987000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2604, "row": 0, "thread": 13, "query": null }, "op": "u", "ts_ms": 1719831987238, "transaction": null }
关注before、after符合更新的逻辑,op为u表示更新数据
删除数据
mysql删除数据
mysql> delete from stu where id=4;
IDEA控制台输出
{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null}
格式化输出
{ "before": { "id": 4, "name": "zl", "age": 19 }, "after": null, "source": { "version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1719832151000, "snapshot": "false", "db": "test", "sequence": null, "table": "stu", "server_id": 1, "gtid": null, "file": "mysql-bin.000001", "pos": 2913, "row": 0, "thread": 13, "query": null }, "op": "d", "ts_ms": 1719832151198, "transaction": null }
关注before、after符合删除的逻辑,op为d表示删除数据
完成!enjoy it!
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。