binlog+canel windows服务器下mysql数据的部署过程

article/2025/10/3 12:28:45

1.开启mysql的binlog功能,并配置binlog模式为row。

  1.1windos下修改mysql的my.ini文件(该文件在C:\ProgramData\MySQL\MySQL Server 5.7,而不是C:\Program             Files\MySQL\MySQL Server 5.7下的my-default.ini文件,C:\ProgramData\MySQL\MySQL Server 5.7是一个隐藏文件,需要显示隐藏文件才能找得到)

 1.2修改my.ini增加如下配置

  1. log-bin=mysql-bin #添加这一行就ok

  2. binlog-format=ROW #选择row模式

  3. server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重

1.3在mysql中 配置canal数据库管理用户,配置相应权限(repication权限)


       

  CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

 1.4重启mysql服务

2.下载canel程序 : https://github.com/alibaba/canal/releases

 

2.1解压后的目录如下

2.2修改instance.properties文件的配置

# 主库的IP地址和端口号,目前是我本机的IP和端口
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# 上文中1.3新增了数据库的用户信息canelcanal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8#默认数据库,我自己创建的一个库20190426
canal.instance.defaultDatabaseName = 20190426  
# enable druid Decrypt database password
canal.instance.enableDruid=false

2.3 执行bin目录下的startup.bat,双击执行

2.4执行控制台如下

3.创建监听的maven工程 

添加POM依赖   

 <dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.0.12</version></dependency>

package com.banksteel.canel;import java.net.InetSocketAddress;
import java.util.List;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;public class ClientSample {public static void main(String args[]) {// 创建链接CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), "example", "", "");int batchSize = 1000;int emptyCount = 0;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmtryCount = 1200;while (emptyCount < totalEmtryCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}} else {emptyCount = 0;// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);printEntry(message.getEntries());}connector.ack(batchId); // 提交确认// connector.rollback(batchId); // 处理失败, 回滚数据}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<Entry> entrys) {for (Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {continue;}RowChange rowChage = null;try {rowChage = RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType == EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn( List<Column> columns) {for (Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

 

4.代码执行后如下

5.在数据库中创建一张表,然后修改数据,在eclipse中查看监听的信息

CREATE TABLE `test1` (`uid` int(4) NOT NULL AUTO_INCREMENT,`name` varchar(10) NOT NULL,PRIMARY KEY (`uid`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

 


http://chatgpt.dhexx.cn/article/3ITeHrSo.shtml

相关文章

十九、基于canel的网络策略

说明 Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设…

canel-1.1.5 canal.deployer安装

简介 canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;向 MySQL master 发送 dump 协…

Kubernetes基于canel的网络策略

Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设置一个…

binlog以及Canel

一、NDB存储引擎 在介绍binlog之前,需要先了解一下NDB NDB 存储引擎也叫NDB Cluster 存储引擎,主要用于MySQL Cluster 分布式集群环境。 NDB特点: • 分布式:分布式存储引擎,可以由多个NDBCluster存储引擎组成集群分别存放整体数据的一部分 • 支持事务:和Innodb一样…

canel的网络策略

资源&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 基于pod Egress 是Pod作为客户端(需要定义目标端口和目标地址) ingress 是pod作为服务端(需要定义目标地址和pod自己的端口) canel工作的默认网段是192.168.0.0/16的网…

超详细的Canal入门,看这篇就够了!

思维导图 文章已收录Github精选&#xff0c;欢迎Star&#xff1a;https://github.com/yehongzhi/learningSummary 前言 我们都知道一个系统最重要的是数据&#xff0c;数据是保存在数据库里。但是很多时候不单止要保存在数据库中&#xff0c;还要同步保存到Elastic Search、HB…

数据同步神器Canel-day01

背景 关于数据同步的方式有很多种&#xff0c;现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中&#xff0c;并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中&#xff0c;对于这种场景的使用我们应该怎么去实现呢&#xff1f;…

Canal使用流程、部署安装文档

文章目录 背景信息使用流程步骤一&#xff1a;准备MySQL数据源步骤二&#xff1a;创建索引步骤三&#xff1a;安装并启动Canal-server(Deployer)步骤四&#xff1a;部署Client-Adapter步骤五&#xff1a;验证增量数据同步步骤六&#xff1a;同步MySQL全量数据&#xff08;如果不…

Canel-简介使用

简介 canal 的工作原理 MySQL 主从复制过程 ➢ Master 主库将改变记录&#xff0c;写到二进制日志(binary log)中 ➢ Slave 从库向 mysql master 发送 dump 协议&#xff0c;将 master 主库的 binary log events 拷贝到它的中继日志(relay log)&#xff1b; ➢ Slave 从…

检查页面Session是否过期,过期执行相应操作 解决方法

how to check session is expired or not if expired then redirect to login page 在项目中&#xff0c;如果客户打开页面时间过久容易导致页面Session过期&#xff0c;再进行任何操作时都会提示“Asp.Net session has expired”&#xff0c;这样毕竟都用户不太友好&#xff0…

thinkphp如何有效的设置session过期时间

thinkphp提供了一个参数让我们配置session过期时间。 SESSION_OPTIONS array(expire > 3600 ); 然而这一配置是否真的有效&#xff1f;在多次测试之后&#xff0c;不遂人意。 why&#xff1f;那我们试着从源码上分析这个配置参数的&#xff0c;它是怎么让尝试着然我们的…

session过期时间设置

设置session过期有三种方法&#xff1a; 1.在tomcat中进行设置 tomcat的conf文件下的&#xff0c;web.xml文件中 tomcat默认session超时时间为30分钟&#xff0c;可以根据需要修改&#xff0c;负数或0为不限制session失效时间 这里要注意这个session设置的时间是根据服务器来…

springboot+shiro中自定义session过期时间

在springboot工程中&#xff0c;使用shiro作为权限框架&#xff0c;并采用redis来管理session时&#xff0c;如何自定义session过期时间&#xff1f; 上面与会话或缓存相关的组件有&#xff1a; Session Manager&#xff1a;会话管理器Session DAO&#xff1a;会话 DAO&#…

JAVA WEB 设置session过期时间

1.在web容器中设置 &#xff08;以tomcat为例,Tomcat默认session超时时间为30分钟&#xff09; 在tomcat/conf/web.xml里面进行配置&#xff0c;单位是分钟&#xff0c;永不过期可以设置-1 <session-config> <!-- 时间单位为分钟--> <session-timeout>30&…

springboot2.0设置session过期时间。

目的&#xff1a;springboot2.0设置session过期时间。 网上很多设置 springboot session 过期时间&#xff0c;已经不适合 springboot2.0. 下面这个我亲测有效。 请注意格式&#xff1a;我这个用的是 application.yml servlet:session:timeout: 3600s 所有文章优先发布在个人…

Java 设置session过期时间

设置session过期或超时时间 设置session的过期或超时时间&#xff0c;有三个地方&#xff1a; a、tomcat的web.xml中&#xff0c;该单位为分&#xff1a; Xml代码 <session-config> <session-timeout>720</session-timeout> </session-config>…

web 项目中设置session过期时间

java web项目中要想设置session过期时间&#xff0c;有三种设置方法&#xff0c;都是给与某个过期时间值&#xff0c;其中-1 代表session永远不会过期。 1. 第一种方式&#xff1a;通过代码设置方式&#xff0c;其中600表示600秒 2. 第二种方式&#xff1a;通过web.xml方式&am…

session会话过期时间设置

具体设置很简单&#xff0c;方法有三种&#xff1a; &#xff08;1&#xff09;在主页面或者公共页面中加入&#xff1a;session.setMaxInactiveInterval(900); 参数900单位是秒&#xff0c;即在没有活动15分钟后&#xff0c;session将失效。设置为-1将永不关闭。 这里要注意…

关于Session过期/失效的理解,session与cookie的交互

一直好奇关于Session的过期&#xff0c;一种说法是关闭浏览器即Session失效&#xff0c;另一种说法是可以设置Session的过期时间&#xff0c;时间到了自动过期。 这两种说法到底是怎么回事&#xff1f;Session过期跟Cookie过期又有什么关系&#xff1f; 网上搜了几篇相关文章…

数据库 存储过程

创建存储过程 create procedures_student sex varchar&#xff08;10&#xff09; as select * from 学生信息 where 性别sex 这样就创建了一个存储过程 exec proc_student sex女 使用带默认值的参数 create proc p_employee departmentid varchar&…