Canel-简介使用

article/2025/10/3 13:39:14

简介

canal 的工作原理

MySQL 主从复制过程
Master 主库将改变记录,写到二进制日志(binary log)中
Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log
events 拷贝到它的中继日志(relay log);
Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

canal 的工作原理

很简单,就是把自己伪装成 slave,假装从 master 复制数据

 

开始使用 

前期准备

开启mysql的binlog

sudo vi /etc/mysql/my.cnf
[mysqld]
server-id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall

参数说明 

  • 在[mysqld] ,log-bin=mysql-bin
这 个 表 示 binlog 日 志 的 前 缀 是 mysql-bin , 以后生成的日志文件就是
mysql-bin.123456 的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大
小的阈值时,新生一个文件,按顺序编号。
  • mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。
  • binlog-do-db指定监控哪个数据库

配置好以后重启mysql

sudo systemctl restart mysqld
赋权限
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 
'canal'@'%' IDENTIFIED BY 'canal' ;

使用

canal 架构

下载

https://github.com/alibaba/canal/releases  

主体配置

tar -zxvf canal.deployer-1.1.4.tar.gz

修改配置文件canal.properties

vi canal.properties
canal.serverMode = kafka
canal.mq.servers = master:9092,node1:9092,node2:9092
通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/
下的 每一个 example 即是一个实例 ,每个实例下面都有独立的配置文件。默认只有一个实
例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,
并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的
canal.destinations=实例 1,实例 2,实例 3
canal.destinations = example

实例配置

修改 instance.properties
我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在
conf/example 目录下
vi instance.properties
配置连接 MySQL 的用户名和密码 ,默认就是我们前面授权的 canal
canal.instance.dbUsername=root
canal.instance.dbPassword=root
修改输出到 Kafka 的主题以及分区数
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱
binlog 的顺序
# mq config
canal.mq.topic=gmall_db
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#canal.mq.partition=0
# hash partition config,下面的分区数和自己主题的partition数目相同
canal.mq.partitionsNum=4

启动

bin/startup.sh

测试

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic gmall_db

然后往mysql里面监听的数据库插入一条数据看kafka是否有数据接收到

停止

bin/stop.sh

canal 高可用

       这种 zookeeper 为观察者监控的模式, 只能实现高可用,而不是负载均衡 ,即同一时点只
有一个 canal-server 节点能够监控某个数据源,只要这个节点能够正常工作,那么其他监控这
个数据源的 canal-server 只能做 stand-by ,直到工作节点停掉,其他 canal-server 节点才
能抢占。因为有一个 stand-by 也要占用资源,同时 canal 传输数据宕机的情况也比较少,所
以好多企业是不配置 canal 的高可用的。

接收到数据的格式例子

添加

{"data": [{"id": "1547090631092903951","user_id": "1413","nick_name": null,"head_img": null,"sku_id": "16","spu_id": "4","order_id": "5991","appraise": "1201","comment_txt": "评论内容:51811533721518964616668687831874922222995433369391","create_time": "2022-06-28 13:28:44","operate_time": null}],"database": "gmall","es": 1657690124000,"id": 211,"isDdl": false,"mysqlType": {"id": "bigint(20)","user_id": "bigint(20)","nick_name": "varchar(20)","head_img": "varchar(200)","sku_id": "bigint(20)","spu_id": "bigint(20)","order_id": "bigint(20)","appraise": "varchar(10)","comment_txt": "varchar(2000)","create_time": "datetime","operate_time": "datetime"},"old": null,"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"user_id": -5,"nick_name": 12,"head_img": 12,"sku_id": -5,"spu_id": -5,"order_id": -5,"appraise": 12,"comment_txt": 12,"create_time": 93,"operate_time": 93},"table": "comment_info","ts": 1657690130536,"type": "INSERT"
}

删除

{"data": [{"dic_code": "1102","dic_name": "微信","parent_code": "11","create_time": null,"operate_time": null}],"database": "gmall","es": 1657690329000,"id": 212,"isDdl": false,"mysqlType": {"dic_code": "varchar(10)","dic_name": "varchar(100)","parent_code": "varchar(10)","create_time": "datetime","operate_time": "datetime"},"old": null,"pkNames": null,"sql": "","sqlType": {"dic_code": 12,"dic_name": 12,"parent_code": 12,"create_time": 93,"operate_time": 93},"table": "base_dic","ts": 1657690329374,"type": "DELETE"
}

修改

{"data": [{"id": "1","login_name": "g4hpd2mp2","nick_name": "修改","passwd": null,"name": "沈昌成","phone_num": "13973123428","email": "g4hpd2mp2@qq.com","head_img": null,"user_level": "1","birthday": "2004-08-10","gender": "M","create_time": "2020-06-10 21:48:29","operate_time": "2022-06-28 13:28:34","status": null}],"database": "gmall","es": 1657690400000,"id": 213,"isDdl": false,"mysqlType": {"id": "bigint(20)","login_name": "varchar(200)","nick_name": "varchar(200)","passwd": "varchar(200)","name": "varchar(200)","phone_num": "varchar(200)","email": "varchar(200)","head_img": "varchar(200)","user_level": "varchar(200)","birthday": "date","gender": "varchar(1)","create_time": "datetime","operate_time": "datetime","status": "varchar(200)"},"old": [{"nick_name": "雄琛"}],"pkNames": ["id"],"sql": "","sqlType": {"id": -5,"login_name": 12,"nick_name": 12,"passwd": 12,"name": 12,"phone_num": 12,"email": 12,"head_img": 12,"user_level": 12,"birthday": 91,"gender": 12,"create_time": 93,"operate_time": 93,"status": 12},"table": "user_info","ts": 1657690400556,"type": "UPDATE"
}


http://chatgpt.dhexx.cn/article/3cIJ5Akb.shtml

相关文章

检查页面Session是否过期,过期执行相应操作 解决方法

how to check session is expired or not if expired then redirect to login page 在项目中,如果客户打开页面时间过久容易导致页面Session过期,再进行任何操作时都会提示“Asp.Net session has expired”,这样毕竟都用户不太友好&#xff0…

thinkphp如何有效的设置session过期时间

thinkphp提供了一个参数让我们配置session过期时间。 SESSION_OPTIONS array(expire > 3600 ); 然而这一配置是否真的有效?在多次测试之后,不遂人意。 why?那我们试着从源码上分析这个配置参数的,它是怎么让尝试着然我们的…

session过期时间设置

设置session过期有三种方法: 1.在tomcat中进行设置 tomcat的conf文件下的,web.xml文件中 tomcat默认session超时时间为30分钟,可以根据需要修改,负数或0为不限制session失效时间 这里要注意这个session设置的时间是根据服务器来…

springboot+shiro中自定义session过期时间

在springboot工程中,使用shiro作为权限框架,并采用redis来管理session时,如何自定义session过期时间? 上面与会话或缓存相关的组件有: Session Manager:会话管理器Session DAO:会话 DAO&#…

JAVA WEB 设置session过期时间

1.在web容器中设置 &#xff08;以tomcat为例,Tomcat默认session超时时间为30分钟&#xff09; 在tomcat/conf/web.xml里面进行配置&#xff0c;单位是分钟&#xff0c;永不过期可以设置-1 <session-config> <!-- 时间单位为分钟--> <session-timeout>30&…

springboot2.0设置session过期时间。

目的&#xff1a;springboot2.0设置session过期时间。 网上很多设置 springboot session 过期时间&#xff0c;已经不适合 springboot2.0. 下面这个我亲测有效。 请注意格式&#xff1a;我这个用的是 application.yml servlet:session:timeout: 3600s 所有文章优先发布在个人…

Java 设置session过期时间

设置session过期或超时时间 设置session的过期或超时时间&#xff0c;有三个地方&#xff1a; a、tomcat的web.xml中&#xff0c;该单位为分&#xff1a; Xml代码 <session-config> <session-timeout>720</session-timeout> </session-config>…

web 项目中设置session过期时间

java web项目中要想设置session过期时间&#xff0c;有三种设置方法&#xff0c;都是给与某个过期时间值&#xff0c;其中-1 代表session永远不会过期。 1. 第一种方式&#xff1a;通过代码设置方式&#xff0c;其中600表示600秒 2. 第二种方式&#xff1a;通过web.xml方式&am…

session会话过期时间设置

具体设置很简单&#xff0c;方法有三种&#xff1a; &#xff08;1&#xff09;在主页面或者公共页面中加入&#xff1a;session.setMaxInactiveInterval(900); 参数900单位是秒&#xff0c;即在没有活动15分钟后&#xff0c;session将失效。设置为-1将永不关闭。 这里要注意…

关于Session过期/失效的理解,session与cookie的交互

一直好奇关于Session的过期&#xff0c;一种说法是关闭浏览器即Session失效&#xff0c;另一种说法是可以设置Session的过期时间&#xff0c;时间到了自动过期。 这两种说法到底是怎么回事&#xff1f;Session过期跟Cookie过期又有什么关系&#xff1f; 网上搜了几篇相关文章…

数据库 存储过程

创建存储过程 create procedures_student sex varchar&#xff08;10&#xff09; as select * from 学生信息 where 性别sex 这样就创建了一个存储过程 exec proc_student sex女 使用带默认值的参数 create proc p_employee departmentid varchar&…

数据库--存储过程

介绍 对sql语句进行封装、复用 创建、调用 --存储过程 --创建 create procedure p1() beginselect count(*) from t_test;end;--调用 call p1();存储过程查看、删除 --查看 select * from information_schema.ROUTINES WHERE ROUTINE_SCHEMA test SHOW create procedure p1…

SQL Sever数据库存储过程

一、背景介绍 1.遇到存储过程 回顾之前知识&#xff0c;使用在当下&#xff08;毕业设计&#xff09; 2.了解周边知识 二、思路&方案 1.了解存储过程定义、语法、种类 2.存储过程有什么优缺点 3。存储过程与触发器和函数的联系 三、过程 1.什么是存储过程&#xff1f;…

数据库MySQL —— 存储过程

目录 一、介绍 二、基本语法 三、变量 1. 系统变量 2. 用户自定义变量 3. 局部变量 四、流程控制语句 1. if判断 2. 参数 3. case 4. 循环 4.1 while 4.2 repeat 4.3 loop 五、游标 - cursor 六、条件处理程序 - handler 七、存储函数 一、介绍 存储过…

mysql数据库之存储过程

一、存储过程简介。 存储过程是事先经过编译并存储在数据库中的一段sql语句的集合&#xff0c;调用存储过程可以简化应用开发人员的很多工作&#xff0c;减少数据在数据库和应用服务器之间的传输&#xff0c;对于提高数据处理的效率是也有好处的。 存储过程思想上很简单&…

MySQL数据库存储过程

存储过程相关命令汇总存储过程存储过程优化再说存储过程的输出参数再说WHILE 和 REPEAT循环 存储过程&#xff08;Stored Procedure&#xff09;是在大型数据库系统中&#xff0c;一组为了完成特定功能的SQL 语句集&#xff0c;存储在数据库中&#xff0c;经过第一次编译后再次…

MySQL数据库存储过程讲解与实例

存储过程简介 SQL语句需要先编译然后执行&#xff0c;而存储过程&#xff08;Stored Procedure&#xff09;是一组为了完成特定功能的SQL语句集&#xff0c;经编译后存储在数据库中&#xff0c;用户通过指定存储过程的名字并给定参数&#xff08;如果该存储过程带有参数&#x…

mysql数据库存储过程详解

1.什么是存储过程 存储过程&#xff08;Stored Procedure&#xff09;是在大型数据库系统中&#xff0c;一组为了完成特定功能的SQL 语句集&#xff0c;它存储在数据库中&#xff0c;一次编译后永久有效&#xff0c;用户通过指定存储过程的名字并给出参数&#xff08;如果该存…

MySQL中的存储过程(详细篇)

文章目录 概述优点缺点 MySQL存储过程的定义存储过程的基本语句格式存储过程的使用定义一个存储过程定义一个有参数的存储过程定义一个流程控制语句 IF ELSE定义一个条件控制语句 CASE定义一个循环语句 WHILE定义一个循环语句 REPEAT UNTLL定义一个循环语句 LOOP使用存储过程插…

跨域及cors解决跨域

1.什么是跨域 出于浏览器的同源策略限制。同源策略&#xff08;Sameoriginpolicy&#xff09;是一种约定&#xff0c;它是浏览器最核心也最基本的安全功能&#xff0c;如果缺少了同源策略&#xff0c;则浏览器的正常功能可能都会受到影响。可以说Web是构建在同源策略基础之上的…