使用Canal组件订阅MySQL binlog数据增量

article/2025/10/3 10:46:24

一、简介

Canal是一款强大的开源组件,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

1. canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)
    工作原理简略图
    2. 基于日志增量订阅和消费的业务包括
  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理
    当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

二、环境准备

1. 开启MySQL的Binlog日志写入功能
找到MySQL服务器上的MySQL配置文件my.ini,增添以下配置:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:修改完配置之后,一定要重启MySQL!!!
以Windows系统为例:
打开桌面,右键点击“此电脑”—> 管理 —> 服务和应用程序 —>服务,找到MySQL服务,右键,选择重启。
在这里插入图片描述

2. 在MySQL中创建一个名为 ‘canal’ 的用户,并授予该用户在MySQL所有数据库中的所有表上进行 SELECT、复制从服务器和复制客户端操作的权限:

CREATE USER canal IDENTIFIED BY 'canal';  -- 创建了一个名为 canal 的用户,并设置其密码为 'canal'
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

第2条授权语句是指:授予了 canal 用户在所有数据库中的所有表上进行 SELECT、复制从服务器和复制客户端操作的权限。其中\*.*表示所有数据库中的所有表,'canal'@'%' 表示在所有的 IP 地址上都可以使用这个用户进行操作。这个授权语句可以根据实际需求进行修改,比如指定特定的数据库和表,或者只在某个特定的 IP 地址上授权等。

3. 下载Canal组件
下载 canal, 访问release页面,以1.1.6版本为例:
在这里插入图片描述
将下载下来的canal.deployer-1.1.6.tar.gz安装包进行解压缩:
在这里插入图片描述
打开/conf/example/instance.properties配置修改

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*

可以根据实际需要修改这些配置项,配置完后将配置文件保存为 .properties 文件,并将文件放置在 Canal 的 conf 目录下,然后启动 Canal 即可开始监控 MySQL 实例的 binlog

4. 启动Canal服务
Linux:

sh bin/startup.sh

Windows:
双击startup.sh脚本文件。

注意:如果在Windows中启动canal下的startup.sh后出现闪退,且/canal/logs/canal_stdout.log日志为:

Unrecognized VM option 'AggressiveOpts'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

这个问题可能是因为 startup.sh 脚本中使用了在 Windows 平台下不支持的 JVM 参数,导致 JVM 启动失败。

解决办法:
可以尝试修改 startup.sh 文件,将其中的 JVM 参数改为 Windows 下支持的格式。具体来说,可以将 startup.sh 中的 -XX:+AggressiveOpts 参数改为 -XX:-UseAggressiveOpts 或者去掉这个参数。

三、通过Java客户端监听

方式一:拉取canal源代码

https://github.com/alibaba/canal.git

直接打开com.alibaba.otter.canal.example.SimpleCanalClientTest类。

方式二、自行创建项目工程

  1. 新建maven工程,修改pom文件,并引入maven依赖:
<dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.6</version>
</dependency>
  1. 编写简单的Canal客户端demo:
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;public class SimpleCanalClientExample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}
  1. 运行client。

四、触发数据库变更

  1. 在MySQL中,选择一个数据表进行更新操作:
    在这里插入图片描述
  2. client客户端控制台输出如下信息:
    在这里插入图片描述

http://chatgpt.dhexx.cn/article/cyHuKnSj.shtml

相关文章

mysql进阶:canal搭建主从|集群架构

0.引言 之前我们讲解过canal的各种应用&#xff0c;但是对于生产环境来讲&#xff0c;服务高可用是必须保证的。因此canal单节点是不能满足我们的需求的。就需要搭建canal集群。 1. canal集群模式 从架构方式上出发&#xff0c;我们用来保证服务高可用的手段主要是主从架构、…

ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步

前言&#xff1a; ①canal.adapter-1.1.5 支持一对一单表的增量数据同步ElasticSearch 7&#xff1b; ②对于多表聚合场景的SQL满足不了我们的业务需求。 ③采用java canal 接入&#xff0c;可以实现灵活化增量数据准实时同步 文章目录 一、java canal 接入1. 依赖导入2. 增加配…

浅入浅出keepalived+mysql实现高可用双机热备

当数据库发生宕机的情况,如果配置了数据库主从同步模式或主主同步模式,则可以从从库中获取数据。 当数据库发生宕机的情况,要求应用系统实现高可用,应用系统不会受到影响,需要对mysql进行双机热备实现数据库的高可用。 实现双机热备的方式有Mycat组件方式、canel组件方式、…

【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

欢迎关注博客主页&#xff1a;微信搜&#xff1a;import_bigdata&#xff0c;大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞、收藏、留言 &#xff0c;欢迎留言交流&#xff01;本文由【王知无】原创&#xff0c;首发于 CSDN博客&#xff01;本文首发CSDN论…

4、Canal的 instance模块

一、总体结构 如确所述&#xff0c;Canal server的模式有两种&#xff1a;manager和spring 在core模块中&#xff0c;定义了CanalInstance接口&#xff0c;以及其抽象类子类AbstractCanalInstance。 在spring模块&#xff0c;提供了基于spring配置方式的CanalInstanceWithSpr…

canal实现mysql数据同步

前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现mysql数据同步的过程 同步原理 如上图所示,为canal同步mysql数据的原理的简单示…

SpringBoot系列之canal和kafka实现异步实时更新

SpringBoot系列之canal和kafka实现异步实时更新 实验开发环境 JDK 1.8SpringBoot2.2.1Maven 3.2+开发工具 IntelliJ IDEAsmartGit1、什么是阿里canal? canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟…

Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听

canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 目录 一、MySQL设置二、启动Canal服务端三、通过Canal客户端消费数据四、通过RabbitMQ消费数据1、启动RabbitMQ2、修改canal配置3、消…

Kubernetes 学习19基于canel的网络策略

一、概述 1、我们说过&#xff0c;k8s的可用插件有很多&#xff0c;除了flannel之外&#xff0c;还有一个流行的叫做calico的组件&#xff0c;不过calico在很多项目中都会有这个名字被应用&#xff0c;所以他们把自己称为project calico&#xff0c;但是很多时候我们在kubernet…

基于canel的网络策略

基于canel的网络策略 canel工作的默认网段是192.168.0.0/16的网段 官网&#xff1a;https://docs.projectcalico.org/v3.2/introduction/ 参考地址&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 分别运行&#xff1a;…

REDIS11_缓存和数据库一致性如何保证、解决方案、提供Canel解决数据一致性问题

文章目录 ①. 缓存和数据库双写一致保证②. 缓存数据一致性-解决方案③. 缓存数据一致性-解决-Canal ①. 缓存和数据库双写一致保证 ①. 只要用缓存,就可能会涉及到缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题 ②. 那么,如何解决一致性问题&#xff1f;提…

binlog+canel windows服务器下mysql数据的部署过程

1.开启mysql的binlog功能&#xff0c;并配置binlog模式为row。 1.1windos下修改mysql的my.ini文件&#xff08;该文件在C:\ProgramData\MySQL\MySQL Server 5.7&#xff0c;而不是C:\Program Files\MySQL\MySQL Server 5.7下的my-default.ini文件&#xff0c;C:\Pr…

十九、基于canel的网络策略

说明 Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设…

canel-1.1.5 canal.deployer安装

简介 canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;向 MySQL master 发送 dump 协…

Kubernetes基于canel的网络策略

Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设置一个…

binlog以及Canel

一、NDB存储引擎 在介绍binlog之前,需要先了解一下NDB NDB 存储引擎也叫NDB Cluster 存储引擎,主要用于MySQL Cluster 分布式集群环境。 NDB特点: • 分布式:分布式存储引擎,可以由多个NDBCluster存储引擎组成集群分别存放整体数据的一部分 • 支持事务:和Innodb一样…

canel的网络策略

资源&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 基于pod Egress 是Pod作为客户端(需要定义目标端口和目标地址) ingress 是pod作为服务端(需要定义目标地址和pod自己的端口) canel工作的默认网段是192.168.0.0/16的网…

超详细的Canal入门,看这篇就够了!

思维导图 文章已收录Github精选&#xff0c;欢迎Star&#xff1a;https://github.com/yehongzhi/learningSummary 前言 我们都知道一个系统最重要的是数据&#xff0c;数据是保存在数据库里。但是很多时候不单止要保存在数据库中&#xff0c;还要同步保存到Elastic Search、HB…

数据同步神器Canel-day01

背景 关于数据同步的方式有很多种&#xff0c;现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中&#xff0c;并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中&#xff0c;对于这种场景的使用我们应该怎么去实现呢&#xff1f;…

Canal使用流程、部署安装文档

文章目录 背景信息使用流程步骤一&#xff1a;准备MySQL数据源步骤二&#xff1a;创建索引步骤三&#xff1a;安装并启动Canal-server(Deployer)步骤四&#xff1a;部署Client-Adapter步骤五&#xff1a;验证增量数据同步步骤六&#xff1a;同步MySQL全量数据&#xff08;如果不…