聊聊数据同步方案

article/2025/9/11 23:10:18

文章目录

  • 常用的数据同步方案
    • 数据库迁移场景
    • 数据同步场景
      • 应用代码中同步
      • 定时任务同步
      • 通过MQ实现同步
      • 通过CDC实现实时同步
  • CDC(change data capture,数据变更抓取)
    • Canal
      • 基于日志增量订阅&消费支持的业务
      • 工作原理
      • Mysql主备复制实现
      • Canal架构
      • Canal是怎么假装成是Mysql Slave的?
      • Canal是怎么解析binlog的?
      • Quick Start
        • 准备
        • 启动
      • 变更分发
  • 总结

常用的数据同步方案

Q:大家知道的数据库同步方案或者工具有哪些?

数据库迁移场景

以Mysql数据库迁移为例,数据库常用迁移方案有停机迁移和平滑迁移。

平滑迁移又分为双写和CDC(数据变更抓取)。

双写:即所有写入操作同时写入旧表和新表,这种方式可以完全控制应用代码如何写数据库,听上去简单明了。但它会引入复杂的分布式一致性问题:要保证新旧库中两张表数据一致,双写操作就必须在一个分布式事务中完成,而分布式事务的代价太高了。
CDC:通过数据源的事务日志抓取数据源变更解决数据同步问题

数据同步场景

微服务开发环境下,为了提高搜索效率,以及搜索的精准度,会大量使用Redis、MongoBD等NoSQL数据库,也会使用大量的Solr、Elasticsearch等全文检索服务。那么,这个时候,就会有一个问题需要我们来思考和解决:那就是数据同步的问题!如何将实时变化的数据库中的数据同步到Redis/MongoBD或者Solr/Elasticsearch中呢?

在这里插入图片描述

应用代码中同步

在增加、修改、删除之后,执行操作ES的逻辑代码。例如下面的代码片段。

public ResponseResult updateStatus(Long[] ids, String status){try{taskService.updateStatus(ids, status);if("status_success".equals(status)){List<Task> itemList = taskService.getTaskList(ids, status);//数据写入esesClient.importList(itemList);//数据写入redis
//          redisTemplate.save(itemList);return new ResponseResult(true, "修改状态成功")}}catch(Exception e){return new ResponseResult(false, "修改状态失败");}
}

优点

实施起来比较简单,简单服务里面常用的方式。

缺点

代码耦合度高。

和业务逻辑同步执行,效率变低。

Q:这里有一个问题想和大家讨论一下,对于一个方法里既有数据库的操作又有同步调用http/rpc接口的方法,如何保证一致性?

比如下面这个场景:

一个售后工单的处理,首先需要经过【客诉系统】,然后需要转到【工单系统】生成一个工单,方法逻辑大概如下:

@Transactional
public void handleKeSU(Integer orderId) {//调用http接口插入工单httpClient.saveGongDan(orderId);//修改客诉单状态为【已转工单】updateKeSuStatus(orderId);
}

因为流程问题,客诉单状态修改和工单系统生成工单需要一致,即工单生成成功,则客诉单状态修改成功,工单生成失败,则客诉单修改失败。

解决方案:将http调用放到本地数据库修改后面,依据事物回滚。

这样还有什么问题?当http调用响应时间超时,其实调用方工单已经生成成功,但是本地调用响应超时抛出异常导致回滚。

定时任务同步

在数据库中执行完增加、修改、删除操作后,通过定时任务定时的将数据库的数据同步到ES索引库中。

定时任务技术有:SpringTask,Quartz,XXLJOB。

这里执行定时任务时,需要注意的一个技巧是:第一次执行定时任务时,从MySQL数据库中以时间字段进行倒序排列查询相应的数据,并记录当前查询数据的时间字段的最大值,以后每次执行定时任务查询数据的时候,只要按时间字段倒序查询数据表中的时间字段大于上次记录的时间值的数据,并且记录本次任务查询出的时间字段的最大值即可,从而不需要再次查询数据表中的所有数据。

注意:这里所说的时间字段指的是标识数据更新的时间字段,也就是说,使用定时任务同步数据时,为了避免每次执行任务都会进行全表扫描,最好是在数据表中增加一个更新记录的时间字段。

优点

同步ES索引库的操作与业务代码完全解耦。

缺点

数据的实时性并不高。

通过MQ实现同步

在数据库中执行完增加、修改、删除操作后,向MQ中发送一条消息,此时,同步程序作为MQ中的消费者,从消息队列中获取消息,然后执行同步Solr索引库的逻辑。

我们可以使用下图来简单的标识通过MQ实现数据同步的过程。

我们可以使用如下代码实现这个过程。

public ResponseResult updateStatus(Long[] ids, String status){try{goodsService.updateStatus(ids, status);if("status_success".equals(status)){List<TbItem> itemList = goodsService.getItemList(ids, status);final String jsonString = JSON.toJSONString(itemList);//发送消息jmsTemplate.send(queueSolr, new MessageCreator(){@Overridepublic Message createMessage(Session session) throws JMSException{return session.createTextMessage(jsonString);}});}return new ResponseResult(true, "修改状态成功");}catch(Exception e){return new ResponseResult(false, "修改状态失败");}
}

优点

业务代码解耦,并且能够做到准实时。目前tk的ES同步用的就是这中方式吧

缺点

需要在业务代码中加入发送消息到MQ的代码,数据调用接口耦合。

通过CDC实现实时同步

通过CDC来解析数据库的日志信息,来检测数据库中表结构和数据的变化,从而更新ES索引库。

使用CDC可以做到业务代码完全解耦,API完全解耦,可以做到准实时。

CDC(change data capture,数据变更抓取)

通过数据源的事务日志抓取数据源变更,这能解决一致性问题(只要下游能保证变更应用到新库上)。它的问题在于各种数据源的变更抓取没有统一的协议,如
MySQL 用 Binlog,PostgreSQL 用 Logical decoding 机制,MongoDB 里则是 oplog。

  • Canal,阿里开源的基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql。

  • Databus,Linkedin 的分布式数据变更抓取系统。
    它的 MySQL 变更抓取模块很不成熟,官方支持的是 Oracle,MySQL 只是使用另一个开源组件 OpenReplicator 做了一个 demo。另一个不利因素 databus 使用了自己实现的一个 Relay 作为变更分发平台,相比于使用开源消息队列的方案,这对维护和外部集成都不友好。

  • Mysql-Streamer,Yelp 的基于python的数据管道。

  • Debezium,Redhat 开源的数据变更抓取组件。
    支持 MySQL、MongoDB、PostgreSQL 三种数据源的变更抓取。Snapshot Mode 可以将表中的现有数据全部导入 Kafka,并且全量数据与增量数据形式一致,可以统一处理,很适合数据库迁移;

Canal

canal [kə’næl],译意为水道/管道/沟渠,纯Java开发,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费,

目前canal只能支持row模式的增量订阅(statement只有sql,没有数据,所以无法获取原始的变更日志)

基于日志增量订阅&消费支持的业务

数据库实时备份
多级索引 (卖家和买家各自分库索引)
业务cache刷新
价格变化等重要业务消息

工作原理

在这里插入图片描述

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

Mysql主备复制实现

在这里插入图片描述
从上层来看,复制分成三步:

  1. master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  2. slave将master的binary log events拷贝到它的中继日志(relay log);
  3. slave重做中继日志中的事件,将改变反映它自己的数据;

Canal架构

在这里插入图片描述
通过deployer模块,启动一个canal-server,一个cannal-server内部包含多个instance,每个instance都会伪装成一个mysql实例的slave。client与server之间的通信协议由protocol模块定义。client在订阅binlog信息时,需要传递一个destination参数,server会根据这个destination确定由哪一个instance为其提供服务。
在这里插入图片描述
说明:

  • server代表一个canal运行实例,对应于一个jvm
  • instance对应于一个数据队列(1个server对应1…n个instance)

instance模块:

  • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
  • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
  • eventStore (数据存储,目前只存在内存里)
  • metaManager (增量订阅&消费信息管理器)

Canal是怎么假装成是Mysql Slave的?

  1. 与Mysql Master服务器建立Socket链接;
  2. 根据Mysql协议规范发送身份认证数据包进行身份认证;
  3. 根据Mysql协议规范发送slave注册数据包将自己伪装成Mysql Slave;
  4. 根据Mysql协议规范发送Dump请求,让Master给自己推送Binlog日志;

Canal是怎么解析binlog的?

Mysql Binlog介绍:http://dev.mysql.com/doc/refman/5.5/en/binary-log.html

一个binlog包含一个四字节的模数和一系列描述数据变更的Event,每一个Event又包含header和data两部分,大致结构如下:
在这里插入图片描述
基于Row模式的binlog主要包括以下几个Event:

目前canal只能支持row模式的增量订阅(statement只有sql,没有数据,所以无法获取原始的变更日志)

TABLE_MAP_EVENT:描述变更的数据库表

WRITE_ROWS_EVENT:描述插入数据变更

UPDATE_ROWS_EVENT:描述修改数据变更

DELETE_ROWS_EVENT:描述删除数据变更

根据Event的固定结构就可以解析出来相应的数据变更信息。

演示查看binlog:mysqlbinlog --no-defaults --base64-output=decode-rows -v ../data/binlog.000034 | more

Quick Start

准备

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.1.4 版本为例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

解压缩
mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
##mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
  • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
  • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

启动
sh bin/startup.sh

查看 server 日志
vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看 instance 的日志
vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

修改表数据,查看抓取到的日志

关闭
sh bin/stop.sh

变更分发

抓取到数据变更之后,需要考虑怎么将这些变更分发出去。

再来回顾一下Canal基本架构。
在这里插入图片描述

从上图可以看到Canal是Server-Client模式。

Server,主要是解析、分发、存储binlog;

Client,通过ClientAPI你可以从Server获取变更数据;

ClientAdapter,扩展Client的功能,包括将数据同步到RDB,ES,HBASE;

但其实这种Client模式并没有达到真正的解耦,更关键的是目前只有Java语言的Client,为了解决这个问题,大家自然而然想到消息中间件。

事实上,Canal 1.1.1版本以后也是支持在Canal Server解析binlog以后直接将数据投递到Kafka/RocketMQ。

于是就有了下面的同步方案:
在这里插入图片描述

总结

本文主要讨论数据同步方案,并对canal做了简单介绍。同时也对binlog的解析和mysql协议简单介绍希望能了解这种CDC的基本原理。


http://chatgpt.dhexx.cn/article/6JPvPDBq.shtml

相关文章

大数据之路——数据同步

三、数据技术篇—— 数据同步 3.1 数据同步基础 3.1.1 直连同步3.1.2 数据文件同步3.1.3 数据库日志解析同步 3.2 数据仓库同步方式3.2.1 批量数据同步3.2.2 实时数据同步 3.3 同步遇到的问题3.3.1 分库分表3.3.2 增量全量同步的合并3.3.3 数据漂移的处理 有多种不同应用场景&…

关于数据同步的几种实现

关于数据同步的几种实现 概述 关于数据同步主要有两个层面的同步&#xff0c;一是通过后台程序编码实现数据同步&#xff0c;二是直接作用于数据库&#xff0c;在数据库层面实现数据的同步。通过程序编码实现数据同步&#xff0c;其主要的实现思路很容易理解&#xff0c;即有…

数据同步技术

本次旨在分享数据同步技术的相关知识点&#xff0c;包括数据同步概述、数据同步工具、数据库、数据同步到大数据平台。 首先来介绍一下数据同步的概念&#xff1a; 数据同步是为保持数据源与目的地数据一致性而进行的数据传输、处理的过程。 数据同步的场景&#xff1a; 1、主…

几种常见的数据同步方式

数据仓库的特性之一是集成&#xff0c;即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层&#xff0c;一般情况下&#xff0c;这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中)&#xff0c;将数据采集并导入到数仓中(通常是Hive…

内网穿透frpc ,frps的使用

情况是这样的&#xff0c;公司内网中一个设备接了路由器下发的地址&#xff0c;内网地址是192.168.1.100&#xff0c;可以访问我的台式机&#xff0c;但我的台式机访问192.168.1.100是无法连通的 这种情况下&#xff0c;在我机器上运行frps.exe&#xff0c;frps.ini如下 [com…

Frp内网穿透——frps服务端部署

由于现在IPv4地址的短缺&#xff0c;在国内不可能每个设备都会分配到一个公网IP&#xff0c;因此从公网中访问自己的私有设备向来是一件难事儿。本次带大家了解一下frp内网穿透的服务端教学&#xff0c;让你也能够部署一个内网穿透服务。 frp简介 通俗的说&#xff0c;frp是一…

记一次使用frpc/frps进行内网穿透

1. 前提条件&#xff1a; 有一个公网ip&#xff0c;这里用x.x.x.x代替 2. 配置 【服务器端】 S_NUMBER是一个端口号 #服务端口 bind_port S_NUMBER #监听地址 bind_addr 0.0.0.0 #认证token token xxxx【客户端】(也就是需要被内网穿透的服务器) C_NUMBER是一个端口号 …

内网穿透神器Frps一键安装脚本及设置教程

frps 是一个高性能的反向代理应用&#xff0c;可以帮助您轻松地进行内网穿透&#xff0c;对外网提供服务&#xff0c;支持 tcp, http, https 等协议类型&#xff0c;并且 web 服务支持根据域名进行路由转发。 *因为frps是go语言写的&#xff0c;所以在路由器上使用的时候&#…

frpc和frps 内网穿透越狱插件

内网穿透、frp、frpc、frps https://zhaoboy9692.github.io/repo 越狱源 https://zhaoboy9692.github.io/repo 苦于在ios越狱下没有frp穿透使用 特地开发了的越狱插件 基于最新frp0.48编译 ios14.6测试没问题 有问题及时反馈

使用frps和frpc实现内网穿透

内网穿透的作用包括跨网段访问一个局域网中的一台主机。 如上图&#xff0c;假设我们想要通过主机A访问主机C&#xff0c;但是主机A和主机C绑定的都是私有ip地址&#xff0c;所以它们之间是无法直接进行通信的。要想使得A和C能够进行通信&#xff0c;就需要用到内网穿透的技术。…

frp服务端(frps) 安装及使用

FRP官方文档 https://gofrp.org/docs/ 服务端安装 环境 ubuntu 22.04 下载 Github 的 Release 中下载到最新版本的客户端和服务端二进制文件 可以指定你的目录&#xff0c;这里用 /usr/local/frp cd /usr/local/frp wget https://github.com/fatedier/frp/releases/dow…

CentOS Frp内网穿透:Frps+Nginx反向代理

目录 服务器使用配置 一、Nginx安装 二、Frps安装 三、frpc安装 服务器使用配置 CentOS 7.6 CPU: 2核 内存: 4GB 一、Nginx安装 参考《Centos配置Nginxtomcat》&#xff0c;这里就不做过多阐述 二、Frps安装 这里使用的是阿里源 #下载脚本 wget https://code.aliyun.com…

nginx反向代理frps frpc穿透

frps 和 nginx 在同一台机器&#xff0c;假设ip192.168.166.17 1. frps服务器端配置 测试时&#xff0c;frps服务器跟nginx在同一台机器(192.168.166.17)&#xff0c;理论上可以不在同一台机器&#xff0c;nginx可以代理http请求&#xff0c;发给frps服务端。 frps.ini # fr…

利用空闲服务器搭建frps服务端-实现穿透代理

利用frps代理Tcp或者udp或其它类型的连接 1、什么是frps/frpc frps是代理的服务端、frpc是代理的客户端&#xff0c;使用方数据传输到服务端&#xff0c;服务端再将数据传输到提供方&#xff0c;从而达到相互访问的目的。 2、什么是穿透 穿透就是客户端A和客户端B都没有公网…

利用frps进行内网穿透

这里使用的是传统穿透方法&#xff0c;需要一个有公网ip的中转节点去告知 看最下面&#xff0c;用最新版的frps 1、注意 服务器和客户机之间的数据传输全部经过中转服务器&#xff0c;传输速度将受制于中转服务器的上下行带宽。 2、穿透原理 其实就是客户端A绑定端口发送数…

使用frps建立内网穿透从而实现外界连接内网电脑的全教程

1. 说明 我有台服务器&#xff0c;但它在内网里&#xff0c;我需要通过ssh方式访问它&#xff0c;目前可以采用&#xff1a;向日葵等商业软件&#xff0c;RustDesk等开源软件。或者&#xff0c;《自建内网穿透服务器》。 本教程把实现上述功能的所有步骤罗列出来&#xff0c;以…

frp 内网穿透服务器搭建frps服务端和frpc客户端

1 工具 一台具有公网ip的服务器 2 下载frp frp下载地址 打开上面的frp下载地址 公网服务器上 打开下载文件 frps是服务端&#xff0c;在公网服务器上部署 frpc是客户端&#xff0c;在需要内网穿透的电脑上部署 1. frps配置 首先我这用的是win公网服务器 &#xff08;linux…

内网穿透配置(FRP)

目录 0、内网穿透的一般场景 1、内网穿透配置 a、frp软件下载 b、frp 的配置 3、通过 frp 实现远程连接 4、设置 frpc / frps 开机启动的方法 5、设置frp安全连接的方法 0、内网穿透的一般场景 放假回家怎么远程连接学校实验室的服务器&#xff1f; 先分析一波&#x…

FRP入门篇

目录 一、前言 1、概述 2、原理 3、支持功能 4、适用场景 二、环境准备 三、使用 1、安装包下载 2、服务端部署 2.1、上传安装包 2.3、启动服务端 3、客户端部署 3.1、代理服务准备 3.2、上传安装包 3.3、客户端配置 3.4、启动客户端 4、功能验证 一、前言 1、…

frps内网穿透

1 原理讲解 frp工作原理 服务端运行&#xff0c;监听一个主端口&#xff0c;等待客户端的连接&#xff1b; 客户端连接到服务端的主端口&#xff0c;同时告诉服务端要监听的端口和转发类型&#xff1b;服务端fork新的进程监听客户端指定的端口&#xff1b; 外网用户连接到客户…