超详细的Canal入门,看这篇就够了!

article/2025/10/3 13:13:08

思维导图

在这里插入图片描述

文章已收录Github精选,欢迎Star:https://github.com/yehongzhi/learningSummary

前言

我们都知道一个系统最重要的是数据,数据是保存在数据库里。但是很多时候不单止要保存在数据库中,还要同步保存到Elastic Search、HBase、Redis等等。

这时我注意到阿里开源的框架Canal,他可以很方便地同步数据库的增量数据到其他的存储应用。所以在这里总结一下,分享给各位读者参考~

一、什么是canal

我们先看官网的介绍

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

这句介绍有几个关键字:增量日志,增量数据订阅和消费

这里我们可以简单地把canal理解为一个用来同步增量数据的一个工具

接下来我们看一张官网提供的示意图:

canal的工作原理就是把自己伪装成MySQL slave,模拟MySQL slave的交互协议向MySQL Mater发送 dump协议,MySQL mater收到canal发送过来的dump请求,开始推送binary log给canal,然后canal解析binary log,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

二、canal能做什么

以下参考canal官网。

与其问canal能做什么,不如说数据同步有什么作用。

但是canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护
  • 业务cache(缓存)刷新
  • 带业务逻辑的增量数据处理

三、如何搭建canal

3.1 首先有一个MySQL服务器

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

我的Linux服务器安装的MySQL服务器是5.7版本。

MySQL的安装这里就不演示了,比较简单,网上也有很多教程。

然后在MySQL中需要创建一个用户,并授权:

-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'Canal@123456';

下一步在MySQL配置文件my.cnf设置如下信息:

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

改了配置文件之后,重启MySQL,使用命令查看是否打开binlog模式:
在这里插入图片描述
查看binlog日志文件列表:
在这里插入图片描述
查看当前正在写入的binlog文件:
在这里插入图片描述
MySQL服务器这边就搞定了,很简单。

3.2 安装canal

去官网下载页面进行下载:https://github.com/alibaba/canal/releases

我这里下载的是1.1.4的版本:
在这里插入图片描述
解压canal.deployer-1.1.4.tar.gz,我们可以看到里面有四个文件夹:

接着打开配置文件conf/example/instance.properties,配置信息如下:

## mysql serverId , v1.0.26+ will autoGen
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# mysql主库链接时起始的binlog偏移量
canal.instance.master.position=154
# mysql主库链接时起始的binlog的时间戳
canal.instance.master.timestamp=
canal.instance.master.gtid=# username/password
# 在MySQL服务器授权的账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123456
# 字符集
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false# table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开
canal.instance.filter.regex=.*\\..*
# mysql 数据解析表的黑名单,多个表用,隔开
canal.instance.filter.black.regex=

我这里用的是win10系统,所以在bin目录下找到startup.bat启动:

启动就报错,坑呀:

要修改一下启动的脚本startup.bat:
在这里插入图片描述
然后再启动脚本:
在这里插入图片描述
这就启动成功了。

Java客户端操作

首先引入maven依赖:

<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.4</version>
</dependency>

然后创建一个canal项目,使用SpringBoot构建,如图所示:
在这里插入图片描述
在CannalClient类使用Spring Bean的生命周期函数afterPropertiesSet():

@Component
public class CannalClient implements InitializingBean {private final static int BATCH_SIZE = 1000;@Overridepublic void afterPropertiesSet() throws Exception {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");try {//打开连接connector.connect();//订阅数据库表,全部表connector.subscribe(".*\\..*");//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿connector.rollback();while (true) {// 获取指定数量的数据Message message = connector.getWithoutAck(BATCH_SIZE);//获取批量IDlong batchId = message.getId();//获取批量的数量int size = message.getEntries().size();//如果没有数据if (batchId == -1 || size == 0) {try {//线程休眠2秒Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}} else {//如果有数据,处理数据printEntry(message.getEntries());}//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。connector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}/*** 打印canal server解析binlog获得的实体类信息*/private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {//开启/关闭事务的实体类型,跳过continue;}//RowChange对象,包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等RowChange rowChage;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);}//获取操作类型:insert/update/delete类型EventType eventType = rowChage.getEventType();//打印Header信息System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));//判断是否是DDL语句if (rowChage.getIsDdl()) {System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());}//获取RowChange对象里的每一行数据,打印出来for (RowData rowData : rowChage.getRowDatasList()) {//如果是删除语句if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());//如果是新增语句} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());//如果是更新的语句} else {//变更前的数据System.out.println("------->; before");printColumn(rowData.getBeforeColumnsList());//变更后的数据System.out.println("------->; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

以上就完成了Java客户端的代码。这里不做具体的处理,仅仅是打印,先有个直观的感受。

最后我们开始测试,首先启动MySQL、Canal Server,还有刚刚写的Spring Boot项目。然后创建表:

CREATE TABLE `tb_commodity_info` (`id` varchar(32) NOT NULL,`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',`number` int(10) DEFAULT '0' COMMENT '商品数量',`description` varchar(2048) DEFAULT '' COMMENT '商品描述',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

然后我们在控制台就可以看到如下信息:

如果新增一条数据到表中:

INSERT INTO tb_commodity_info VALUES('3e71a81fd80711eaaed600163e046cc3','叉烧包','3.99',3,'又大又香的叉烧包,老人小孩都喜欢');

控制台可以看到如下信息:
在这里插入图片描述

总结

canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时,其实是很多企业一种比较常见的数据同步的方案。

通过上面的学习之后,我们应该都明白canal是什么,它的原理,还有用法。实际上这仅仅只是入门,因为实际项目中我们不是这样玩的…

实际项目我们是配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理

Canal的部署也是支持集群的,需要配合ZooKeeper进行集群管理。

Canal还有一个简单的Web管理界面。

下一篇就讲一下集群部署Canal,配合使用Kafka,同步数据到Redis

参考资料:Canal官网

絮叨

上面所有例子的代码都上传Github了:

https://github.com/yehongzhi/mall

如果你觉得这篇文章对你有用,就点赞,关注,评论吧~

你的三连是我创作的最大动力~

拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!!
在这里插入图片描述

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!


http://chatgpt.dhexx.cn/article/SCOWCgsj.shtml

相关文章

数据同步神器Canel-day01

背景 关于数据同步的方式有很多种&#xff0c;现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中&#xff0c;并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中&#xff0c;对于这种场景的使用我们应该怎么去实现呢&#xff1f;…

Canal使用流程、部署安装文档

文章目录 背景信息使用流程步骤一&#xff1a;准备MySQL数据源步骤二&#xff1a;创建索引步骤三&#xff1a;安装并启动Canal-server(Deployer)步骤四&#xff1a;部署Client-Adapter步骤五&#xff1a;验证增量数据同步步骤六&#xff1a;同步MySQL全量数据&#xff08;如果不…

Canel-简介使用

简介 canal 的工作原理 MySQL 主从复制过程 ➢ Master 主库将改变记录&#xff0c;写到二进制日志(binary log)中 ➢ Slave 从库向 mysql master 发送 dump 协议&#xff0c;将 master 主库的 binary log events 拷贝到它的中继日志(relay log)&#xff1b; ➢ Slave 从…

检查页面Session是否过期,过期执行相应操作 解决方法

how to check session is expired or not if expired then redirect to login page 在项目中&#xff0c;如果客户打开页面时间过久容易导致页面Session过期&#xff0c;再进行任何操作时都会提示“Asp.Net session has expired”&#xff0c;这样毕竟都用户不太友好&#xff0…

thinkphp如何有效的设置session过期时间

thinkphp提供了一个参数让我们配置session过期时间。 SESSION_OPTIONS array(expire > 3600 ); 然而这一配置是否真的有效&#xff1f;在多次测试之后&#xff0c;不遂人意。 why&#xff1f;那我们试着从源码上分析这个配置参数的&#xff0c;它是怎么让尝试着然我们的…

session过期时间设置

设置session过期有三种方法&#xff1a; 1.在tomcat中进行设置 tomcat的conf文件下的&#xff0c;web.xml文件中 tomcat默认session超时时间为30分钟&#xff0c;可以根据需要修改&#xff0c;负数或0为不限制session失效时间 这里要注意这个session设置的时间是根据服务器来…

springboot+shiro中自定义session过期时间

在springboot工程中&#xff0c;使用shiro作为权限框架&#xff0c;并采用redis来管理session时&#xff0c;如何自定义session过期时间&#xff1f; 上面与会话或缓存相关的组件有&#xff1a; Session Manager&#xff1a;会话管理器Session DAO&#xff1a;会话 DAO&#…

JAVA WEB 设置session过期时间

1.在web容器中设置 &#xff08;以tomcat为例,Tomcat默认session超时时间为30分钟&#xff09; 在tomcat/conf/web.xml里面进行配置&#xff0c;单位是分钟&#xff0c;永不过期可以设置-1 <session-config> <!-- 时间单位为分钟--> <session-timeout>30&…

springboot2.0设置session过期时间。

目的&#xff1a;springboot2.0设置session过期时间。 网上很多设置 springboot session 过期时间&#xff0c;已经不适合 springboot2.0. 下面这个我亲测有效。 请注意格式&#xff1a;我这个用的是 application.yml servlet:session:timeout: 3600s 所有文章优先发布在个人…

Java 设置session过期时间

设置session过期或超时时间 设置session的过期或超时时间&#xff0c;有三个地方&#xff1a; a、tomcat的web.xml中&#xff0c;该单位为分&#xff1a; Xml代码 <session-config> <session-timeout>720</session-timeout> </session-config>…

web 项目中设置session过期时间

java web项目中要想设置session过期时间&#xff0c;有三种设置方法&#xff0c;都是给与某个过期时间值&#xff0c;其中-1 代表session永远不会过期。 1. 第一种方式&#xff1a;通过代码设置方式&#xff0c;其中600表示600秒 2. 第二种方式&#xff1a;通过web.xml方式&am…

session会话过期时间设置

具体设置很简单&#xff0c;方法有三种&#xff1a; &#xff08;1&#xff09;在主页面或者公共页面中加入&#xff1a;session.setMaxInactiveInterval(900); 参数900单位是秒&#xff0c;即在没有活动15分钟后&#xff0c;session将失效。设置为-1将永不关闭。 这里要注意…

关于Session过期/失效的理解,session与cookie的交互

一直好奇关于Session的过期&#xff0c;一种说法是关闭浏览器即Session失效&#xff0c;另一种说法是可以设置Session的过期时间&#xff0c;时间到了自动过期。 这两种说法到底是怎么回事&#xff1f;Session过期跟Cookie过期又有什么关系&#xff1f; 网上搜了几篇相关文章…

数据库 存储过程

创建存储过程 create procedures_student sex varchar&#xff08;10&#xff09; as select * from 学生信息 where 性别sex 这样就创建了一个存储过程 exec proc_student sex女 使用带默认值的参数 create proc p_employee departmentid varchar&…

数据库--存储过程

介绍 对sql语句进行封装、复用 创建、调用 --存储过程 --创建 create procedure p1() beginselect count(*) from t_test;end;--调用 call p1();存储过程查看、删除 --查看 select * from information_schema.ROUTINES WHERE ROUTINE_SCHEMA test SHOW create procedure p1…

SQL Sever数据库存储过程

一、背景介绍 1.遇到存储过程 回顾之前知识&#xff0c;使用在当下&#xff08;毕业设计&#xff09; 2.了解周边知识 二、思路&方案 1.了解存储过程定义、语法、种类 2.存储过程有什么优缺点 3。存储过程与触发器和函数的联系 三、过程 1.什么是存储过程&#xff1f;…

数据库MySQL —— 存储过程

目录 一、介绍 二、基本语法 三、变量 1. 系统变量 2. 用户自定义变量 3. 局部变量 四、流程控制语句 1. if判断 2. 参数 3. case 4. 循环 4.1 while 4.2 repeat 4.3 loop 五、游标 - cursor 六、条件处理程序 - handler 七、存储函数 一、介绍 存储过…

mysql数据库之存储过程

一、存储过程简介。 存储过程是事先经过编译并存储在数据库中的一段sql语句的集合&#xff0c;调用存储过程可以简化应用开发人员的很多工作&#xff0c;减少数据在数据库和应用服务器之间的传输&#xff0c;对于提高数据处理的效率是也有好处的。 存储过程思想上很简单&…

MySQL数据库存储过程

存储过程相关命令汇总存储过程存储过程优化再说存储过程的输出参数再说WHILE 和 REPEAT循环 存储过程&#xff08;Stored Procedure&#xff09;是在大型数据库系统中&#xff0c;一组为了完成特定功能的SQL 语句集&#xff0c;存储在数据库中&#xff0c;经过第一次编译后再次…

MySQL数据库存储过程讲解与实例

存储过程简介 SQL语句需要先编译然后执行&#xff0c;而存储过程&#xff08;Stored Procedure&#xff09;是一组为了完成特定功能的SQL语句集&#xff0c;经编译后存储在数据库中&#xff0c;用户通过指定存储过程的名字并给定参数&#xff08;如果该存储过程带有参数&#x…