Flink CDC的使用

07-08 1015阅读

MySQL数据准备

create database if not exists test;
use test;
drop table if exists stu;
create table stu (id int primary key auto_increment, name varchar(100), age int);
insert into stu(name, age) values("张三",18);
insert into stu(name, age) values("李四",20);
insert into stu(name, age) values("王五",21);

注意:表必须有主键

Flink CDC的使用
(图片来源网络,侵删)

开启MySQL binlog

修改MySQL配置,开启binlog

$ sudo vim /etc/my.cnf,添加如下设置

server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=test

注意:启用binlog的数据库,需根据实际情况作出修改

重启mysql

$ sudo systemctl restart mysqld

代码开发

依赖

Flink CDC依赖

		
        
            com.ververica
            flink-connector-mysql-cdc
            2.4.0
        
        
            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        

完整依赖

    
        8
        8
        UTF-8
        1.17.1
        2.4.0
    
    
        
            org.apache.flink
            flink-streaming-java
            ${flink.version}
        
        
            org.apache.flink
            flink-clients
            ${flink.version}
        
        
            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-loader
            ${flink.version}
        
        
            org.apache.flink
            flink-table-runtime
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        
        
            mysql
            mysql-connector-java
            8.0.33
        
        
        
            org.apache.flink
            flink-connector-jdbc
            1.17-SNAPSHOT
        
        
            org.apache.flink
            flink-connector-files
            ${flink.version}
            provided
        
        
            org.apache.flink
            flink-connector-datagen
            ${flink.version}
        
        
            org.apache.flink
            flink-statebackend-rocksdb
            ${flink.version}

        
        
            org.apache.hadoop
            hadoop-client
            3.3.4

        
        
            org.apache.flink
            flink-connector-kafka
            ${flink.version}
        
        
            org.apache.flink
            flink-statebackend-changelog
            ${flink.version}
            runtime
        
        
            com.google.code.findbugs
            jsr305
            1.3.9
        
        
            org.apache.flink
            flink-table-api-java-bridge
            ${flink.version}
        
        
            org.apache.flink
            flink-table-planner-loader
            ${flink.version}
        
        
            org.apache.flink
            flink-table-runtime
            ${flink.version}
        
        
            org.apache.flink
            flink-connector-files
            ${flink.version}
        
        
        
            com.ververica
            flink-connector-mysql-cdc
            ${flink-cdc.vesion}
        
    

 Flink代码

Flink CDC捕获MySQL变更数据(增加、修改、删除),输出到控制台。

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDCDemo {
    public static void main(String[] args) throws Exception {
        // 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        // 数据源
        MySqlSource mySqlSource = MySqlSource.builder()
                .hostname("node4")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("test")
                .tableList("test.stu")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),
                "mysql_source").setParallelism(1);
        // 处理数据
        // 输出数据
        dataStreamSource.print();
        // 执行
        env.execute();
    }
}

运行程序,确保程序无报错,看到如下输出:

18:58:51,826 INFO  io.debezium.connector.mysql.MySqlStreamingChangeEventSource   - Keepalive thread is running

测试

添加数据

mysql添加数据

mysql> insert into stu(name, age) values("赵六",23);

IDEA控制台输出

{"before":null,"after":{"id":4,"name":"赵六","age":23},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831654000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2300,"row":0,"thread":13,"query":null},"op":"c","ts_ms":1719831654692,"transaction":null}

格式化输出

{
    "before": null,
    "after": {
        "id": 4,
        "name": "赵六",
        "age": 23
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1719831654000,
        "snapshot": "false",
        "db": "test",
        "sequence": null,
        "table": "stu",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000001",
        "pos": 2300,
        "row": 0,
        "thread": 13,
        "query": null
    },
    "op": "c",
    "ts_ms": 1719831654692,
    "transaction": null
}

关注before、after符合增加数据的逻辑,op为c表示添加数据

修改数据

mysql修改数据

mysql> update stu set name="zl", age=19 where name="赵六";

IDEA控制台输出

{"before":{"id":4,"name":"赵六","age":23},"after":{"id":4,"name":"zl","age":19},"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719831987000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2604,"row":0,"thread":13,"query":null},"op":"u","ts_ms":1719831987238,"transaction":null}

格式化输出

{
    "before": {
        "id": 4,
        "name": "赵六",
        "age": 23
    },
    "after": {
        "id": 4,
        "name": "zl",
        "age": 19
    },
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1719831987000,
        "snapshot": "false",
        "db": "test",
        "sequence": null,
        "table": "stu",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000001",
        "pos": 2604,
        "row": 0,
        "thread": 13,
        "query": null
    },
    "op": "u",
    "ts_ms": 1719831987238,
    "transaction": null
}

关注before、after符合更新的逻辑,op为u表示更新数据

删除数据

mysql删除数据

mysql> delete from stu where id=4;

IDEA控制台输出

{"before":{"id":4,"name":"zl","age":19},"after":null,"source":{"version":"1.9.7.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1719832151000,"snapshot":"false","db":"test","sequence":null,"table":"stu","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2913,"row":0,"thread":13,"query":null},"op":"d","ts_ms":1719832151198,"transaction":null}
​

格式化输出

{
    "before": {
        "id": 4,
        "name": "zl",
        "age": 19
    },
    "after": null,
    "source": {
        "version": "1.9.7.Final",
        "connector": "mysql",
        "name": "mysql_binlog_source",
        "ts_ms": 1719832151000,
        "snapshot": "false",
        "db": "test",
        "sequence": null,
        "table": "stu",
        "server_id": 1,
        "gtid": null,
        "file": "mysql-bin.000001",
        "pos": 2913,
        "row": 0,
        "thread": 13,
        "query": null
    },
    "op": "d",
    "ts_ms": 1719832151198,
    "transaction": null
}

关注before、after符合删除的逻辑,op为d表示删除数据

完成!enjoy it!

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]