Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理
前言
关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。
注:本文使用的Canal为 v1.1.7
一、Mysql数据库开启bin_log
- 先查看目标数据库是否开启bin_log
SHOW VARIABLES LIKE 'log_bin'
如结果中,log_bin的值为OFF则未开启,为ON则已开启。
- 如未开启,可编辑Mysql配置文件:/etc/my.cnf。
[mysqld] log-bin=mysql-bin # 开启binlog binlog-format=ROW # 选择ROW模式 server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
重启MySQL ,再次通过上一步查看配置是否生效。
二、数据库创建新用户
- 创建专用于数据同步的新用户
-- 创建一个新用户,名称可自行定义 create user canal@'%' IDENTIFIED by 'canal'; -- 为新用户授权 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; -- 刷新缓存中的用户数据 FLUSH PRIVILEGES;
三、配置RabhitMQ
以下使用的名称均可自行定义,保证唯一即可
1. 添加交换机
2. 添加队列
3. 绑定交换机与队列,设置 Routing key
四、下载、配置、运行Canal(windows环境)
1. 下载服务端
-
可到以下地址下载所需版本的包:github-alibaba-canal
本文使用较新的 v1.1.7 。
-
选择下载 canal.deployer-1.1.7.tar.gz。
2. 配置
- 解压下载包,获得如下文件。
- 编辑:conf\canal.properties(仅列出需要修改的配置项)
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ canal.serverMode = rabbitMQ ################################################## ######### RabbitMQ ############# ################################################## # host 无需添加端口号 rabbitmq.host = 192.168.0.2 # 填写 / 即可 rabbitmq.virtual.host = / # RabbitMQ的用户名、密码 rabbitmq.username = admin rabbitmq.password = 123456 # 上文配置的交换机(exchange)名称:Name rabbitmq.exchange = canal.exchange # 交换机类型:Type rabbitmq.deliveryMode = direct # 以下两个字段为自行添加,否则会报空指针异常 # 队列(queue)名称:Name rabbitmq.queue = canal.queue # 绑定队列-交换机时的路由秘钥:Routing key rabbitmq.routingKey = canal.routing.key
- 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
# 目标数据库地址 canal.instance.master.address=192.168.0.1:3306 # 目标数据库用户名密码 canal.instance.dbUsername=canal canal.instance.dbPassword=Canal@123 # 表过滤正则表达式(按需修改) # 全库全表 : .*\\..* # 指定库所有表: 库名\..* 例:test\..* # 单表: 库名.表名 例:test.user # 多规则组合使用: 库名1\..*,库名2.表名1,库名3.表名2 (逗号分隔) 例 test\..*,test2.user1,test3.user2 (逗号分隔) canal.instance.filter.regex=.*\\..* # canal.instance.filter.regex=project.sys_user,project.sys_role
3. 运行
windows环境下直接运行bin\startup.bat,linux环境下执行bin\startup.sh。
执行启动脚本后,查看日志信息logs\canal\canal.log,出现如下信息,表示启动成功。
[main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
五、测试
对监听的数据库表做修改操作,至RabbitMQ控制台的队列中查看是否插入消息。
如下,即成功插入实时操作数据。
六、项目中监听处理
- 创建一个maven项目
- 在pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
org.springframework.boot spring-boot-starter-amqp com.alibaba fastjson 2.0.9.graal
- 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
spring: rabbitmq: host: 192.168.0.2 port: 5672 username: admin password: 123456- binLog数据实体类BinLogEntity
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.Data; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Data public class BinLogEntity { /** * 数据库 */ private String database; /** * 表 */ private String table; /** * 操作类型 */ private String type; /** * 操作数据 */ private JSONArray data; /** * 变更前数据 */ private JSONArray old; /** * 主键名称 */ private JSONArray pkNames; /** * 执行sql语句 */ private String sql; private Long es; private String gtid; private Long id; private Boolean isDdl; private JSONObject mysqlType; private JSONObject sqlType; private Long ts; public List getData(Class clazz) { if (this.data == null || this.data.size() == 0) { return null; } return this.data.toJavaList(clazz); } public List getOld(Class clazz) { if (this.old == null || this.old.size() == 0) { return null; } return this.old.toJavaList(clazz); } public List getPkNames() { if (this.pkNames == null || this.pkNames.size() == 0) { return null; } List pkNames = new ArrayList(); for (Object pkName : this.pkNames){ pkNames.add(pkName.toString()); } return pkNames; } public Map getMysqlType() { if(this.mysqlType == null){ return null; } Map mysqlTypeMap = new HashMap(); this.mysqlType.forEach((k, v) -> { mysqlTypeMap.put(k, v.toString()); }); return mysqlTypeMap; } public Map getSqlType() { if(this.sqlType == null){ return null; } Map sqlTypeMap = new HashMap(); this.sqlType.forEach((k, v) -> { sqlTypeMap.put(k, Integer.valueOf(v.toString())); }); return sqlTypeMap; } }- 操作数据实体类
@Data public class User implements Serializable { private static final long serialVersionUID = 1L; /** * ID */ private Long id; /** * 姓名 */ private String name; /** * 年龄 */ private Integer age; /** * 电话 */ private String phone; }- 监听类CanalListener
import com.alibaba.fastjson.JSON; import com.example.canalclient.entity.BinLogEntity; import com.example.canalclient.entity.User; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; /** * 监听数据库数据变化时RabbitMQ发送的信息 */ @Component public class CanalListener { @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "canal.queue", durable = "true"), exchange = @Exchange(value = "canal.exchange"), key = "canal.routing.key" ) }) public void handleDataChange(@Payload Message message) { // 获取消息内容 String content = new String(message.getBody(), StandardCharsets.UTF_8); // 反序列化 BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class); // 获取操作数据 User user = binLog.getData(User.class).get(0); User oldUser = binLog.getOld(User.class).get(0); System.out.println("数据库:" + binLog.getDatabase()); System.out.println("表:" + binLog.getTable()); System.out.println("操作类型:" + binLog.getType()); System.out.println("主键:" + JSON.toJSONString(binLog.getPkNames())); System.out.println("数据:" + JSON.toJSONString(User)); System.out.println("原数据:" + JSON.toJSONString(User)); System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType())); } }- 打印结果(修改操作)
数据库:project 表:sys_user 操作类型:UPDATE 主键:["id"] 数据:{ "id": 1, "name": "张三", "age": 21, "phone": 13333333333 } 原数据:{ "age": 20, "phone": 12222222222 } MysqlType:{ "id": "bigint unsigned", "name": "varchar(50)", "age": "int(3) unsigned", "phone": "varchar(50)" }至此,已实现对目标数据库实时操作数据进行监听,可根据不同的操作类型,采取相应的业务处理。
七、参考文章
Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了
使用canal同步mysql数据库信息到RabbitMQ
Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑
- 打印结果(修改操作)
- 监听类CanalListener
- 操作数据实体类
- binLog数据实体类BinLogEntity
- 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
- 在pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
- 创建一个maven项目
- 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
-
- 创建专用于数据同步的新用户
- 如未开启,可编辑Mysql配置文件:/etc/my.cnf。
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!









