4、Canal的 instance模块

article/2025/10/3 11:31:16

一、总体结构

在这里插入图片描述

如确所述,Canal server的模式有两种:manager和spring

在core模块中,定义了CanalInstance接口,以及其抽象类子类AbstractCanalInstance。

在spring模块,提供了基于spring配置方式的CanalInstanceWithSpring实现,即CanalInstance实例的创建,通过spring配置文件来创建。

在manager模块中,提供了基于manager配置方式的CanalInstanceWithManager实现,即CanalInstance实例根据远程配置中心的内容来创建。

CanalInstance类图继承关系如下所示:

在这里插入图片描述

二、源码分析

1、CanalInstance接口

Canal官方文档中有一张图描述了CanalInstance的4个主要组件,如下:

在这里插入图片描述

其中:

event parser:数据源接入,模拟slave协议和master进行交互,协议解析(CanalEventParser是数据复制控制器)

event sink:parser和store链接器,进行数据过滤,加工,分发的工作(CanalEventSink是event事件消费者,剥离filter/sink为独立的两个动作,方便在快速判断数据是否有效)

event store:数据存储(CanalEventStore是canel数据存储接口)

meta manager:增量订阅/消费binlog元数据位置存储(CanalMetaManager是meta信息管理器)

public interface CanalInstance extends CanalLifeCycle {

//这个instance对应的destination
String getDestination();
//数据源接入,模拟slave协议和master进行交互,协议解析,位于canal.parse模块中
CanalEventParser getEventParser();
//parser和store链接器,进行数据过滤,加工,分发的工作,位于canal.sink模块中
CanalEventSink getEventSink();
//数据存储,位于canal.store模块中
CanalEventStore getEventStore();
//增量订阅&消费元数据管理器,位于canal.meta模块中
CanalMetaManager getMetaManager();
//告警,位于canal.common块中
CanalAlarmHandler getAlarmHandler();

/*** 客户端发生订阅/取消订阅行为*/
boolean subscribeChange(ClientIdentity identity);
//消息队列的配置模块,位于instance.core模块
CanalMQConfig getMqConfig();

}
可以看到,instance模块其实是把这几个模块组装在一起,为客户端的binlog订阅请求提供服务。有些模块都有多种实现,不同组合方式,最终确定了一个CanalInstance的工作逻辑。

CanalEventParser接口实现类:

MysqlEventParser:伪装成单个mysql实例的slave解析binglog日志

GroupEventParser:伪装成多个mysql实例的slave解析binglog日志。内部维护了多个CanalEventParser。主要应用场景是分库分表:比如产品数据拆分了4个库,位于不同的mysql实例上。

正常情况下,我们需要配置四个CanalInstance。对应的,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。为了方便业务使用,此时我们可以让CanalInstance引用一个GroupEventParser,

由GroupEventParser内部维护4个MysqlEventParser去4个不同的mysql实例去拉取binlog,最终合并到一起。此时业务只需要启动1个客户端,链接这个CanalInstance即可.

LocalBinlogEventParser:解析本地的mysql binlog。例如将mysql的binlog文件拷贝到canal的机器上进行解析。

在这里插入图片描述

CanalEventSink接口实现类:

EntryEventSink:mysql binlog数据对象输出

GroupEventSink:基于归并排序的sink处理

CanalEventStore接口实现类:

目前只有MemoryEventStoreWithBuffer,基于内存buffer构建内存memory store

CanalMetaManager:

ZooKeeperMetaManager:将元数据存存储到zk中

MemoryMetaManager:将元数据存储到内存中

MixedMetaManager:组合memory + zookeeper的使用模式

PeriodMixedMetaManager:基于定时刷新的策略的mixed实现

FileMixedMetaManager:先写内存,然后定时刷新数据到File

关于这些实现的具体细节,我们在相应模块的源码分析时,进行讲解。目前只需要知道,一些组件有多种实现,因此组合工作方式有多种。

2、AbstractCanalInstance类

public class AbstractCanalInstance extends AbstractCanalLifeCycle implements CanalInstance {

private static final Logger                      logger = LoggerFactory.getLogger(AbstractCanalInstance.class);protected Long                                   canalId;                                                      // 和manager交互唯一标示
protected String                                 destination;                                                  // 队列名字
protected CanalEventStore<Event>                 eventStore;                                                   // 有序队列protected CanalEventParser                       eventParser;                                                  // 解析对应的数据信息
protected CanalEventSink<List<CanalEntry.Entry>> eventSink;                                                    // 链接parse和store的桥接器
protected CanalMetaManager                       metaManager;                                                  // 消费信息管理器
protected CanalAlarmHandler                      alarmHandler;                                                 // alarm报警机制
protected CanalMQConfig                          mqConfig;                                                     // mq的配置


}
需要注意的是,在AbstractCanalInstance中,这些字段都是在AbstractCanalInstance的子类中进行赋值的。

2.1 start方法

public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();
}

    if (!alarmHandler.isStart()) {alarmHandler.start();}if (!eventStore.isStart()) {eventStore.start();}if (!eventSink.isStart()) {eventSink.start();}//模板方法,定义启动前后的操作,具体实现在子类实现if (!eventParser.isStart()) {beforeStartEventParser(eventParser); eventParser.start();afterStartEventParser(eventParser);}logger.info("start successful....");
}

官方关于instance模块构成的图中,把metaManager放在最下面,说明其是最基础的部分,因此应该最先启动。

而eventParser依赖于eventSink,需要把自己解析的binlog交给其加工过滤,而eventSink又要把处理后的数据交给eventStore进行存储。因此依赖关系如下:eventStore—>eventSink—>eventParser ,启动的时候也要按照这个顺序启动。

2.2stop方法

public void stop() {
super.stop();
logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });
//也是模板方法,停止前后做点操作
if (eventParser.isStart()) {
beforeStopEventParser(eventParser);
eventParser.stop();
afterStopEventParser(eventParser);
}

    if (eventSink.isStart()) {eventSink.stop();}if (eventStore.isStart()) {eventStore.stop();}if (metaManager.isStart()) {metaManager.stop();}if (alarmHandler.isStart()) {alarmHandler.stop();}logger.info("stop successful....");
}

停止顺序是和启动方法倒过来的。

2.3 eventParser的特殊处理

关于eventParser有以下描述:

在这里插入图片描述

eventParser在启动之前,需要先启动CanalLogPositionManager和CanalHAController。

关于CanalLogPositionManager,做一点补充说明。

mysql在主从同步过程中,要求slave自己维护binlog的消费进度信息。canal伪装成slave,因此也要维护这样的信息。

beforeStartEventParser方法:

protected void beforeStartEventParser(CanalEventParser eventParser) {
//1、判断eventParser的类型是否是GroupEventParser
boolean isGroup = (eventParser instanceof GroupEventParser);
//2、如果是GroupEventParser,则循环启动其内部包含的每一个CanalEventParser,依次调用startEventParserInternal方法
if (isGroup) {
// 处理group的模式
List eventParsers = ((GroupEventParser) eventParser).getEventParsers();
for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动
startEventParserInternal(singleEventParser, true);
}
//如果不是,说明是一个普通的CanalEventParser,直接调用startEventParserInternal方法
} else {
startEventParserInternal(eventParser, false);
}
}
beforeStartEventParser方法的作用是eventParser前做的一些特殊处理。首先会判断eventParser的类型是否是GroupEventParser,这是为了处理分库分表的情况。如果是,循环其包含的所有CanalEventParser,依次调用startEventParserInternal方法;

否则直接调用startEventParserInternal

/**

  • 初始化单个eventParser,不需要考虑group
    */
    protected void startEventParserInternal(CanalEventParser eventParser, boolean isGroup) {
    // 1 、启动CanalLogPositionManager
    if (eventParser instanceof AbstractEventParser) {
    AbstractEventParser abstractEventParser = (AbstractEventParser) eventParser;
    CanalLogPositionManager logPositionManager = abstractEventParser.getLogPositionManager();
    if (!logPositionManager.isStart()) {
    logPositionManager.start();
    }
    }
    // 2 、启动CanalHAController
    if (eventParser instanceof MysqlEventParser) {
    MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
    CanalHAController haController = mysqlEventParser.getHaController();
    if (haController instanceof HeartBeatHAController) {
    ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
    }
    if (!haController.isStart()) {
    haController.start();
    }
    }
    }
    afterStartEventParser方法

// around event parser, default impl
protected void afterStartEventParser(CanalEventParser eventParser) {
// 读取一下历史订阅的filter信息
List clientIdentitys = metaManager.listAllSubscribeInfo(destination);
for (ClientIdentity clientIdentity : clientIdentitys) {
subscribeChange(clientIdentity);
}
}

这个方法内部主要是通过metaManager读取一下历史订阅过这个CanalInstance的客户端信息,然后更新一下filter。

subscribeChange方法:

public boolean subscribeChange(ClientIdentity identity) {
if (StringUtils.isNotEmpty(identity.getFilter())) {
logger.info("subscribe filter change to " + identity.getFilter());
AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

        boolean isGroup = (eventParser instanceof GroupEventParser);if (isGroup) {// 处理group的模式List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();for (CanalEventParser singleEventParser : eventParsers) {// 需要遍历启动if(singleEventParser instanceof AbstractEventParser) {((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);}}} else {if(eventParser instanceof AbstractEventParser) {((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);}}}// filter的处理规则// a. parser处理数据过滤处理// b. sink处理数据的路由&分发,一份parse数据经过sink后可以分发为多份,每份的数据可以根据自己的过滤规则不同而有不同的数据// 后续内存版的一对多分发,可以考虑return true;
}

3、CanalInstanceWithSpring 类

CanalInstanceWithSpring是AbstractCanalInstance的子类,提供了一些set方法为instance的组成模块赋值,如下所示:

public class CanalInstanceWithSpring extends AbstractCanalInstance {

private static final Logger logger = LoggerFactory.getLogger(CanalInstanceWithSpring.class);public void start() {logger.info("start CannalInstance for {}-{} ", new Object[] { 1, destination });super.start();
}// ======== setter ========public void setDestination(String destination) {this.destination = destination;
}public void setEventParser(CanalEventParser eventParser) {this.eventParser = eventParser;
}public void setEventSink(CanalEventSink<List<CanalEntry.Entry>> eventSink) {this.eventSink = eventSink;
}public void setEventStore(CanalEventStore<Event> eventStore) {this.eventStore = eventStore;
}public void setMetaManager(CanalMetaManager metaManager) {this.metaManager = metaManager;
}public void setAlarmHandler(CanalAlarmHandler alarmHandler) {this.alarmHandler = alarmHandler;
}public void setMqConfig(CanalMQConfig mqConfig) {this.mqConfig = mqConfig;
}

}
当我们配置加载方式为spring时,创建的CanalInstance实例类型都是CanalInstanceWithSpring。canal将会寻找本地的spring配置文件来创建instance实例。canal默认提供了一下几种spring配置文件:

4

在这里插入图片描述

spring/memory-instance.xml

spring/file-instance.xml

spring/default-instance.xml

spring/group-instance.xml

在这4个配置文件中,我们无一例外的都可以看到以下bean配置:

这四个配置文件创建的bean实例都是CanalInstanceWithSpring,但是工作方式却是不同的,因为在不同的配置文件中,eventParser、eventSink、eventStore、metaManager这几个属性引用的具体实现不同。

memory-instance.xml

所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析

特点:速度最快,依赖最少(不需要zookeeper)

场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境

<bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink"><property name="eventStore" ref="eventStore" /><property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
</bean><bean id="eventParser" parent="baseEventParser">.......

file-instance.xml

所有的组件(parser , sink , store)都选择了基于file持久化模式,注意,不支持HA机制.

特点:支持单机持久化

场景:生产环境,无HA需求,简单可用.

<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer"><property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" /><property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" /><property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" /><property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" /><property name="raw" value="${canal.instance.memory.rawEntry:true}" />
</bean><bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink"><property name="eventStore" ref="eventStore" /><property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
</bean>


default-instance.xml:

所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.

特点:支持HA

场景:生产环境,集群化部署.

${canal.zkServers:127.0.0.1:2181} ... ... group-instance.xml:

主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。

场景:分库业务。 比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,

可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.

<bean id="eventStore" class="com.alibaba.otter.canal.store.memory.MemoryEventStoreWithBuffer"><property name="bufferSize" value="${canal.instance.memory.buffer.size:16384}" /><property name="bufferMemUnit" value="${canal.instance.memory.buffer.memunit:1024}" /><property name="batchMode" value="${canal.instance.memory.batch.mode:MEMSIZE}" /><property name="ddlIsolation" value="${canal.instance.get.ddl.isolation:false}" /><property name="raw" value="${canal.instance.memory.rawEntry:true}" />
</bean><bean id="eventSink" class="com.alibaba.otter.canal.sink.entry.EntryEventSink"><property name="eventStore" ref="eventStore" /><property name="filterTransactionEntry" value="${canal.instance.filter.transaction.entry:false}"/>
</bean><bean id="eventParser" class="com.alibaba.otter.canal.parse.inbound.group.GroupEventParser">
...........

这几个不同的spring配置文件中,最主要的就是metaManager 和eventParser 这两个配置有所不同,eventStore 、和eventSink 定义都是相同的。这是因为:

eventStore:目前的开源版本中eventStore只有一种基于内存的实现,所以配置都相同

eventSink:其作用是eventParser和eventStore的链接器,进行数据过滤,加工,分发的工作。不涉及存储,也就没有必要针对内存、file、或者zk进行区分。


http://chatgpt.dhexx.cn/article/RTAfDWbp.shtml

相关文章

canal实现mysql数据同步

前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现mysql数据同步的过程 同步原理 如上图所示,为canal同步mysql数据的原理的简单示…

SpringBoot系列之canal和kafka实现异步实时更新

SpringBoot系列之canal和kafka实现异步实时更新 实验开发环境 JDK 1.8SpringBoot2.2.1Maven 3.2+开发工具 IntelliJ IDEAsmartGit1、什么是阿里canal? canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟…

Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听

canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 目录 一、MySQL设置二、启动Canal服务端三、通过Canal客户端消费数据四、通过RabbitMQ消费数据1、启动RabbitMQ2、修改canal配置3、消…

Kubernetes 学习19基于canel的网络策略

一、概述 1、我们说过&#xff0c;k8s的可用插件有很多&#xff0c;除了flannel之外&#xff0c;还有一个流行的叫做calico的组件&#xff0c;不过calico在很多项目中都会有这个名字被应用&#xff0c;所以他们把自己称为project calico&#xff0c;但是很多时候我们在kubernet…

基于canel的网络策略

基于canel的网络策略 canel工作的默认网段是192.168.0.0/16的网段 官网&#xff1a;https://docs.projectcalico.org/v3.2/introduction/ 参考地址&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 分别运行&#xff1a;…

REDIS11_缓存和数据库一致性如何保证、解决方案、提供Canel解决数据一致性问题

文章目录 ①. 缓存和数据库双写一致保证②. 缓存数据一致性-解决方案③. 缓存数据一致性-解决-Canal ①. 缓存和数据库双写一致保证 ①. 只要用缓存,就可能会涉及到缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题 ②. 那么,如何解决一致性问题&#xff1f;提…

binlog+canel windows服务器下mysql数据的部署过程

1.开启mysql的binlog功能&#xff0c;并配置binlog模式为row。 1.1windos下修改mysql的my.ini文件&#xff08;该文件在C:\ProgramData\MySQL\MySQL Server 5.7&#xff0c;而不是C:\Program Files\MySQL\MySQL Server 5.7下的my-default.ini文件&#xff0c;C:\Pr…

十九、基于canel的网络策略

说明 Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设…

canel-1.1.5 canal.deployer安装

简介 canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;向 MySQL master 发送 dump 协…

Kubernetes基于canel的网络策略

Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设置一个…

binlog以及Canel

一、NDB存储引擎 在介绍binlog之前,需要先了解一下NDB NDB 存储引擎也叫NDB Cluster 存储引擎,主要用于MySQL Cluster 分布式集群环境。 NDB特点: • 分布式:分布式存储引擎,可以由多个NDBCluster存储引擎组成集群分别存放整体数据的一部分 • 支持事务:和Innodb一样…

canel的网络策略

资源&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 基于pod Egress 是Pod作为客户端(需要定义目标端口和目标地址) ingress 是pod作为服务端(需要定义目标地址和pod自己的端口) canel工作的默认网段是192.168.0.0/16的网…

超详细的Canal入门,看这篇就够了!

思维导图 文章已收录Github精选&#xff0c;欢迎Star&#xff1a;https://github.com/yehongzhi/learningSummary 前言 我们都知道一个系统最重要的是数据&#xff0c;数据是保存在数据库里。但是很多时候不单止要保存在数据库中&#xff0c;还要同步保存到Elastic Search、HB…

数据同步神器Canel-day01

背景 关于数据同步的方式有很多种&#xff0c;现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中&#xff0c;并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中&#xff0c;对于这种场景的使用我们应该怎么去实现呢&#xff1f;…

Canal使用流程、部署安装文档

文章目录 背景信息使用流程步骤一&#xff1a;准备MySQL数据源步骤二&#xff1a;创建索引步骤三&#xff1a;安装并启动Canal-server(Deployer)步骤四&#xff1a;部署Client-Adapter步骤五&#xff1a;验证增量数据同步步骤六&#xff1a;同步MySQL全量数据&#xff08;如果不…

Canel-简介使用

简介 canal 的工作原理 MySQL 主从复制过程 ➢ Master 主库将改变记录&#xff0c;写到二进制日志(binary log)中 ➢ Slave 从库向 mysql master 发送 dump 协议&#xff0c;将 master 主库的 binary log events 拷贝到它的中继日志(relay log)&#xff1b; ➢ Slave 从…

检查页面Session是否过期,过期执行相应操作 解决方法

how to check session is expired or not if expired then redirect to login page 在项目中&#xff0c;如果客户打开页面时间过久容易导致页面Session过期&#xff0c;再进行任何操作时都会提示“Asp.Net session has expired”&#xff0c;这样毕竟都用户不太友好&#xff0…

thinkphp如何有效的设置session过期时间

thinkphp提供了一个参数让我们配置session过期时间。 SESSION_OPTIONS array(expire > 3600 ); 然而这一配置是否真的有效&#xff1f;在多次测试之后&#xff0c;不遂人意。 why&#xff1f;那我们试着从源码上分析这个配置参数的&#xff0c;它是怎么让尝试着然我们的…

session过期时间设置

设置session过期有三种方法&#xff1a; 1.在tomcat中进行设置 tomcat的conf文件下的&#xff0c;web.xml文件中 tomcat默认session超时时间为30分钟&#xff0c;可以根据需要修改&#xff0c;负数或0为不限制session失效时间 这里要注意这个session设置的时间是根据服务器来…

springboot+shiro中自定义session过期时间

在springboot工程中&#xff0c;使用shiro作为权限框架&#xff0c;并采用redis来管理session时&#xff0c;如何自定义session过期时间&#xff1f; 上面与会话或缓存相关的组件有&#xff1a; Session Manager&#xff1a;会话管理器Session DAO&#xff1a;会话 DAO&#…