Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

2024-07-04 1356阅读

前言

关于Canal的介绍及原理不在此赘述,可自行查阅。笔者在使用Canal同步Mysql实时操作记录至RabbitMQ的过程中,也翻阅了一些大牛们的文章,可能是我使用的Canal版本与文中版本不一致,出现了一些问题,在此总结记录一下可行的方案。

注:本文使用的Canal为 v1.1.7

一、Mysql数据库开启bin_log

  • 先查看目标数据库是否开启bin_log
    SHOW VARIABLES LIKE 'log_bin'
    

    如结果中,log_bin的值为OFF则未开启,为ON则已开启。

    Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

    • 如未开启,可编辑Mysql配置文件:/etc/my.cnf。
      [mysqld]
      log-bin=mysql-bin # 开启binlog
      binlog-format=ROW # 选择ROW模式
      server_id=1 # 配置MySQL replaction需要定义,不和Canal的slaveId重复即可
      

      重启MySQL ,再次通过上一步查看配置是否生效。

      二、数据库创建新用户

      • 创建专用于数据同步的新用户
        -- 创建一个新用户,名称可自行定义
        create user canal@'%' IDENTIFIED by 'canal';
        -- 为新用户授权
        GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
        -- 刷新缓存中的用户数据
        FLUSH PRIVILEGES;
        

        三、配置RabhitMQ

        以下使用的名称均可自行定义,保证唯一即可

        1. 添加交换机

        Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

        2. 添加队列

        Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

        3. 绑定交换机与队列,设置 Routing key

        Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

        四、下载、配置、运行Canal(windows环境)

        1. 下载服务端

        • 可到以下地址下载所需版本的包:github-alibaba-canal

          本文使用较新的 v1.1.7 。

          Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

        • 选择下载 canal.deployer-1.1.7.tar.gz。

          Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

          2. 配置

          • 解压下载包,获得如下文件。

            Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

          • 编辑:conf\canal.properties(仅列出需要修改的配置项)
            # tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
            canal.serverMode = rabbitMQ
            ##################################################
            ######### 		    RabbitMQ	     #############
            ##################################################
            # host 无需添加端口号
            rabbitmq.host = 192.168.0.2
            # 填写 / 即可
            rabbitmq.virtual.host = /
            # RabbitMQ的用户名、密码
            rabbitmq.username = admin
            rabbitmq.password = 123456
            # 上文配置的交换机(exchange)名称:Name
            rabbitmq.exchange = canal.exchange
            # 交换机类型:Type
            rabbitmq.deliveryMode = direct
            # 以下两个字段为自行添加,否则会报空指针异常
            # 队列(queue)名称:Name
            rabbitmq.queue = canal.queue
            # 绑定队列-交换机时的路由秘钥:Routing key
            rabbitmq.routingKey = canal.routing.key
            
            • 编辑:conf\example\instance.properties(仅列出需要修改的配置项)
              # 目标数据库地址
              canal.instance.master.address=192.168.0.1:3306
              # 目标数据库用户名密码
              canal.instance.dbUsername=canal
              canal.instance.dbPassword=Canal@123
              # 表过滤正则表达式(按需修改)
              # 全库全表 : .*\\..*
              # 指定库所有表:  库名\..*   例:test\..*
              # 单表:  库名.表名  例:test.user
              # 多规则组合使用:  库名1\..*,库名2.表名1,库名3.表名2 (逗号分隔)  例 test\..*,test2.user1,test3.user2 (逗号分隔)
              canal.instance.filter.regex=.*\\..*
              # canal.instance.filter.regex=project.sys_user,project.sys_role
              

              3. 运行

              windows环境下直接运行bin\startup.bat,linux环境下执行bin\startup.sh。

              执行启动脚本后,查看日志信息logs\canal\canal.log,出现如下信息,表示启动成功。

              [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
              

              五、测试

              对监听的数据库表做修改操作,至RabbitMQ控制台的队列中查看是否插入消息。

              如下,即成功插入实时操作数据。

              Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

              六、项目中监听处理

              • 创建一个maven项目

                Canal同步Mysql实时操作日志至RabbitMQ,并实现监听及解析处理

                • 在pom.xml中引入spring-boot-starter-amqp依赖,此包集成了对RabbitMQ的支持。
                  	
                  	
                  	    org.springframework.boot
                  	    spring-boot-starter-amqp
                  	
                  	
                  	
                  	
                  	    com.alibaba
                  	    fastjson
                  	    2.0.9.graal
                  	
                  
                  • 修改配置文件application.yml(此处已按个人偏好,文件类型改为yaml),配置RabbitMQ。
                    spring:
                      rabbitmq:
                        host: 192.168.0.2
                        port: 5672
                        username: admin
                        password: 123456
                    
                    • binLog数据实体类BinLogEntity
                      import com.alibaba.fastjson.JSONArray;
                      import com.alibaba.fastjson.JSONObject;
                      import lombok.Data;
                      import java.util.ArrayList;
                      import java.util.HashMap;
                      import java.util.List;
                      import java.util.Map;
                      @Data
                      public class BinLogEntity {
                          /**
                           * 数据库
                           */
                          private String database;
                          /**
                           * 表
                           */
                          private String table;
                          /**
                           * 操作类型
                           */
                          private String type;
                          /**
                           * 操作数据
                           */
                          private JSONArray data;
                          /**
                           * 变更前数据
                           */
                          private JSONArray old;
                          /**
                           * 主键名称
                           */
                          private JSONArray pkNames;
                          /**
                           * 执行sql语句
                           */
                          private String sql;
                          
                          private Long es;
                          private String gtid;
                          private Long id;
                          private Boolean isDdl;
                          private JSONObject mysqlType;
                          private JSONObject sqlType;
                          private Long ts;
                          public  List getData(Class clazz) {
                              if (this.data == null || this.data.size() == 0) {
                                  return null;
                              }
                              return this.data.toJavaList(clazz);
                          }
                          public  List getOld(Class clazz) {
                              if (this.old == null || this.old.size() == 0) {
                                  return null;
                              }
                              return this.old.toJavaList(clazz);
                          }
                          public List getPkNames() {
                              if (this.pkNames == null || this.pkNames.size() == 0) {
                                  return null;
                              }
                              List pkNames = new ArrayList();
                              for (Object pkName : this.pkNames){
                                  pkNames.add(pkName.toString());
                              }
                              return pkNames;
                          }
                          public Map getMysqlType() {
                              if(this.mysqlType == null){
                                  return null;
                              }
                              Map mysqlTypeMap = new HashMap();
                              this.mysqlType.forEach((k, v) -> {
                                  mysqlTypeMap.put(k, v.toString());
                              });
                              return mysqlTypeMap;
                          }
                          public Map getSqlType() {
                              if(this.sqlType == null){
                                  return null;
                              }
                              Map sqlTypeMap = new HashMap();
                              this.sqlType.forEach((k, v) -> {
                                  sqlTypeMap.put(k, Integer.valueOf(v.toString()));
                              });
                              return sqlTypeMap;
                          }
                      }
                      
                      • 操作数据实体类
                        @Data
                        public class User implements Serializable {
                        	private static final long serialVersionUID = 1L;
                        	/**
                        	 * ID
                        	 */
                        	private Long id;
                        	/**
                        	 * 姓名
                        	 */
                        	private String name;
                        	/**
                        	 * 年龄
                        	 */
                        	private Integer age;
                        	/**
                        	 * 电话
                        	 */
                        	private String phone;
                        }
                        
                        • 监听类CanalListener
                          import com.alibaba.fastjson.JSON;
                          import com.example.canalclient.entity.BinLogEntity;
                          import com.example.canalclient.entity.User;
                          import org.springframework.amqp.core.Message;
                          import org.springframework.amqp.rabbit.annotation.Exchange;
                          import org.springframework.amqp.rabbit.annotation.Queue;
                          import org.springframework.amqp.rabbit.annotation.QueueBinding;
                          import org.springframework.amqp.rabbit.annotation.RabbitListener;
                          import org.springframework.messaging.handler.annotation.Payload;
                          import org.springframework.stereotype.Component;
                          /**
                           * 监听数据库数据变化时RabbitMQ发送的信息
                           */
                          @Component
                          public class CanalListener {
                              @RabbitListener(bindings = {
                                      @QueueBinding(
                                              value = @Queue(value = "canal.queue", durable = "true"),
                                              exchange = @Exchange(value = "canal.exchange"),
                                              key = "canal.routing.key"
                                      )
                              })
                          	public void handleDataChange(@Payload Message message) {
                                  // 获取消息内容
                                  String content = new String(message.getBody(), StandardCharsets.UTF_8);
                                  // 反序列化
                                  BinLogEntity binLog = JSON.parseObject(content, BinLogEntity.class);
                                  // 获取操作数据
                                  User user = binLog.getData(User.class).get(0);
                                  User oldUser = binLog.getOld(User.class).get(0);
                                  System.out.println("数据库:" + binLog.getDatabase());
                                  System.out.println("表:" + binLog.getTable());
                                  System.out.println("操作类型:" + binLog.getType());
                                  System.out.println("主键:" + JSON.toJSONString(binLog.getPkNames()));
                                  System.out.println("数据:" + JSON.toJSONString(User));
                                  System.out.println("原数据:" + JSON.toJSONString(User));
                                  System.out.println("MysqlType:" + JSON.toJSONString(binLog.getMysqlType()));
                              }
                          }
                          
                          • 打印结果(修改操作)
                            数据库:project
                            表:sys_user
                            操作类型:UPDATE
                            主键:["id"]
                            数据:{
                            	"id": 1,
                            	"name": "张三",
                            	"age": 21,
                            	"phone": 13333333333
                            }
                            原数据:{
                            	"age": 20,
                            	"phone": 12222222222
                            }
                            MysqlType:{
                            	"id": "bigint unsigned",
                            	"name": "varchar(50)",
                            	"age": "int(3) unsigned",
                            	"phone": "varchar(50)"
                            }
                            

                            至此,已实现对目标数据库实时操作数据进行监听,可根据不同的操作类型,采取相应的业务处理。

                            七、参考文章

                            Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了

                            使用canal同步mysql数据库信息到RabbitMQ

                            Canal配置connector.subscribe和canal.instance.filter.regex遇到的坑

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]