DataLink 数据同步平台

article/2025/9/11 21:43:49

文章目录

  • 一、数据同步平台
    • 概述
    • 核心能力
    • 工作原理
    • 详细流程
  • 二、快速接入
    • 部署中间件
    • 程序配置
    • 创建数据库表
    • 启动应用
    • 注意事项
  • 三、扩展:四种 CDC 方案比较优劣


一、数据同步平台

在项目开发中,经常需要将数据库数据同步到 ES、Redis 等其他平台,通过自己写代码进行同步过程中面临诸多问题,比如业务代码和同步代码高耦合,数据一致性等;一个支持多输入输出源的数据同步平台就显得很有意义。

概述

  • DataLink 数据平台基于当前成熟的 Canal、定时任务补偿、Dubbo 泛化调用等方案,实现异构数据平台的同步和最终一致性问题,支持集群部署。通过 DataLink 健康检查程序,可以对所有中间件和 DataLink 本身服务的异常情况(包括服务宕机等)进行补偿操作。
  • DataLink 当前主要支持对 Mysql 数据同步到 ElasticSearch,对 Oracle、Redis 以及其他 RDB 的输出正在规划中。

核心能力

  • Canal 实时同步
    Canal 订阅 Mysql binlog,推送数据变更信息到 RocketMQ,监听 MQ 处理变更数据到 ElasticSearch
  • 区间数据健康检查、自动补偿
    基于 Elastic-Job 定时任务方式,MD5 比较 ES 与数据库信息,有差异则同步,同时解决所有中间件或同步程序可能的异常情况补偿
  • 基于 RocketMQ 的自定义同步支持
    接入方在没有部署 Canal Server,仅有 RocketMQ 的情况下,主动往 RocketMQ 发送数据请求同步

工作原理

工作原理

详细流程

  • 实时数据同步流程
    实时数据同步流程

  • 健康检查与补偿流程
    健康检查与补偿流程


二、快速接入

部署中间件

  • 部署 Canal Server,使用 MQ 模式,可参考 https://github.com/alibaba/canal
  • 部署 ElasticSearch 6.x,参考 https://www.elastic.co/cn/blog/state-of-the-official-elasticsearch-java-clients
  • 部署 Rocketmq

程序配置

spring:datasource:name: datalink-servicetype: com.alibaba.druid.pool.DruidDataSourceurl: jdbc:mysql://127.0.0.1:3306/ins_basicdata_prd?autoReconnect=true&characterEncoding=UTF8&useSSL=false&serverTimezone=UTCusername: ******password: ******driver-class-name: com.mysql.jdbc.Driverdubbo:application:name: datalinkregistry:address: zookeeper://127.0.0.1:2181?timeout=30000protocol: zookeepertimeout: 60000protocol:port: 21990name: dubboid: dubboconsumer:timeout: 6000rocketmq:# 业务方主动推送 MQ 变更配置(可选)biz:namesrvAddr: 127.0.0.1:9876groupName: datalinkBizConsumerGroup# canal监听的变更 MQ 配置canal:namesrvAddr: 127.0.0.1:9876groupName: datalinkCommonConsumerGroupelaticjob:zookeeper:server-lists: 127.0.0.1:2181namespace: datalinkelasticsearch:cluster-name: elasticsearchcluster-nodes: 127.0.0.1:9300

创建数据库表

CREATE TABLE `m_elastic_search_cfg` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',`config_name` VARCHAR(64) NOT NULL COMMENT '配置名字',`match_topic` VARCHAR(64) NOT NULL COMMENT '匹配的topic',`listen_table` TEXT COMMENT '匹配的数据库和数据表',`primary_key` VARCHAR(64) DEFAULT NULL COMMENT '主键',`interface_name` VARCHAR(128) NOT NULL COMMENT '接口名字',`function_name` VARCHAR(128) NOT NULL COMMENT '方法名字',`search_class` VARCHAR(128) NOT NULL COMMENT '方法的参数',`search_key` VARCHAR(64) NOT NULL COMMENT '方法的参数的主键',`data_node_path` VARCHAR(64) DEFAULT NULL COMMENT '数据节点路径',`version` VARCHAR(64) DEFAULT NULL COMMENT '版本',`es_index` VARCHAR(64) NOT NULL COMMENT 'es的索引',`handler` VARCHAR(64) NOT NULL COMMENT '处理类',`increment_field` VARCHAR(64) DEFAULT NULL COMMENT '定时器补偿字段',`increment_table` TEXT COMMENT '定时器补偿表',`increment_date_column` VARCHAR(64) NOT NULL DEFAULT 'UPDATE_TIME' COMMENT '定时器补偿日期字段',`last_check_time` DATETIME DEFAULT NULL COMMENT '定时器最近同步日期',`create_time` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4 COMMENT='es配置表'
  • match_topic
    配置格式为:ins_trade_prd_0,ins_trade_prd_[0-9](监听 ins_trade_prd_0 到 ins_trade_prd_9)
  • listen_table
    监听表配置,支持正则表达式,例如 ins_trade_prd_[0-9]+\.t_order_main.* 表示监听数据库 ins_trade_prd_0 到 ins_trade_prd_9 10 个数剧库,前缀为 t_order_main 的所有表(分表)。仅 handler 为 DML_NOTIFY_NORMAL 需要配置。
  • primary_key
    主键或者从表的外键,即能从 canal 监听的 dml 数据中提取出用于 dubbo 查询的 searchKey 值;逗号间隔,依次提取
  • interface_name、function_name、search_class、search_key、data_node_path 和 version
    都是配置 dubbo provider 的接口信息,其中 data_node_path 指定返回结果的数据路径 $.content(符合 jsonpath 语法,以 $ 开头)
  • handler
    BIZ_NOTIFY_NORMAL,监听业务推送消息Handler
    DML_NOTIFY_NORMAL,监听业务表数据变更Handler
  • increment_field、increment_table、increment_date_column
    分别为定时任务健康检查和补偿相关字段。increment_table的配置规则同上面match_topic

启动应用

  • 运行 DataLinkApplication 类启动应用程序,几种触发数据同步方式:
    • Canal 监听的业务数据表发生变更
    • 主动往业务 MQ 插入消息
    • 等待定时任务健康检查、数据补偿

注意事项

  • 同步 ES 的索引需要建立,Mapping 映射最好可以先建立(如果不建立,由 ES 动态创建 Mapping)
  • 分库分表情况下,库需要在同一个 IP 下,能够互相访问

三、扩展:四种 CDC 方案比较优劣

  • 抽取处理需要重点考虑增量抽取,也被称为变化数据捕获,简称 CDC。假设一个数据仓库系统,在每天夜里的业务低峰时间从操作型源系统抽取数据,那么增量抽取只需要过去 24 小时内发生变化的数据。变化数据捕获也是建立准实时数据仓库的关键技术。
  • 当你能够识别并获得最近发生变化的数据时,抽取及其后面的转换、装载操作显然都会变得更高效,因为要处理的数据量会小很多。遗憾的是,很多源系统很难识别出最近变化的数据,或者必须侵入源系统才能做到。变化数据捕获是数据抽取中典型的技术挑战。
  • 常用的变化数据捕获方法有时间戳、快照、触发器和日志四种。相信熟悉数据库的读者对这些方法都不会陌生。时间戳方法需要源系统有相应的数据列表示最后的数据变化。快照方法可以使用数据库系统自带的机制实现,如 Oracle 的物化视图技术,也可以自己实现相关逻辑,但会比较复杂。触发器是关系数据库系统具有的特性,源表上建立的触发器会在对该表执行 insert、update、delete 等语句时被触发,触发器中的逻辑用于捕获数据的变化。日志可以使用应用日志或系统日志,这种方式对源系统不具有侵入性,但需要额外的日志解析工作。
  • CDC 大体可以分为两种,一种是侵入式的,另一种是非侵入式的。所谓侵入式的是指 CDC 操作会给源系统带来性能的影响。只要 CDC 操作以任何一种方式对源库执行了 SQL 语句,就可以认为是侵入式的 CDC。基于时间戳的 CDC、基于触发器的 CDC、基于快照的 CDC 是侵入性的,基于日志的 CDC 是非侵入性的。
  • 下表总结了四种CDC方案的特点。
项目时间戳方式快照方式触发器方式日志方式
能区分插入/更新
周期内,检测到多次更新
能检测到删除
不具有侵入性
支持实时
需要 DBA
不依赖数据库

说明:本篇是公司研发平台数据同步的一套解决方案,目前在公司多个项目中已落地,效果还不错,本人在此进行一个分享和记录,其中源码不对外开放,欢迎一起探讨~。


http://chatgpt.dhexx.cn/article/YUEoHQIn.shtml

相关文章

数据同步之全量同步与增量同步

一、什么是数据同步 业务数据是数据仓库的重要数据来源,我们需要每日定时从业务数据库中抽取数据,传输到数据仓库中,之后再对数据进行分析统计。 为保证统计结果的正确性,需要保证数据仓库中的数据与业务数据库是同步的&#xff0…

你了解数据同步吗?

1.写在前面 本篇博客参考《操作系统实战 45 讲》 上篇博客主要介绍的是程序放在什么地方,开发操作系统要了解的最核心的硬件——CPU、MMU、Cache、内存,知道了它们的工作原理。在程序运行中,它们起到了至关重要的作用。 在开发我们自己的操…

数据库同步有哪些方式?【怎么保障目标和源数据一致性】

文章目录 摘要一、几种主流的数据库同步方式二、架构及工作原理三、全量同步和实时增量同步机制四、源和目标五、举例:Oracle 数据实时同步到 Elasticsearch六、目标和源数据一致性七、异构数据类型转换八、总结 摘要 数据库同步有3大难题: 1是如何保障…

数据技术篇之数据同步

第3章 数据同步 1.数据同步基础 直连同步 (1)什么是直连同步?直连同步是指通过定义好的规范接口 API 和基于动态链接库的方式直接连接业务库,如 ODBC/JDBC 等规定了统 一规范的标准接口,不同的数据库基于这套标准接口…

聊聊数据同步方案

文章目录 常用的数据同步方案数据库迁移场景数据同步场景应用代码中同步定时任务同步通过MQ实现同步通过CDC实现实时同步 CDC(change data capture,数据变更抓取)Canal基于日志增量订阅&消费支持的业务工作原理Mysql主备复制实现Canal架构…

大数据之路——数据同步

三、数据技术篇—— 数据同步 3.1 数据同步基础 3.1.1 直连同步3.1.2 数据文件同步3.1.3 数据库日志解析同步 3.2 数据仓库同步方式3.2.1 批量数据同步3.2.2 实时数据同步 3.3 同步遇到的问题3.3.1 分库分表3.3.2 增量全量同步的合并3.3.3 数据漂移的处理 有多种不同应用场景&…

关于数据同步的几种实现

关于数据同步的几种实现 概述 关于数据同步主要有两个层面的同步,一是通过后台程序编码实现数据同步,二是直接作用于数据库,在数据库层面实现数据的同步。通过程序编码实现数据同步,其主要的实现思路很容易理解,即有…

数据同步技术

本次旨在分享数据同步技术的相关知识点,包括数据同步概述、数据同步工具、数据库、数据同步到大数据平台。 首先来介绍一下数据同步的概念: 数据同步是为保持数据源与目的地数据一致性而进行的数据传输、处理的过程。 数据同步的场景: 1、主…

几种常见的数据同步方式

数据仓库的特性之一是集成,即首先把未经过加工处理的、不同来源的、不同形式的数据同步到ODS层,一般情况下,这些ODS层数据包括日志数据和业务DB数据。对于业务DB数据而言(比如存储在MySQL中),将数据采集并导入到数仓中(通常是Hive…

内网穿透frpc ,frps的使用

情况是这样的,公司内网中一个设备接了路由器下发的地址,内网地址是192.168.1.100,可以访问我的台式机,但我的台式机访问192.168.1.100是无法连通的 这种情况下,在我机器上运行frps.exe,frps.ini如下 [com…

Frp内网穿透——frps服务端部署

由于现在IPv4地址的短缺,在国内不可能每个设备都会分配到一个公网IP,因此从公网中访问自己的私有设备向来是一件难事儿。本次带大家了解一下frp内网穿透的服务端教学,让你也能够部署一个内网穿透服务。 frp简介 通俗的说,frp是一…

记一次使用frpc/frps进行内网穿透

1. 前提条件: 有一个公网ip,这里用x.x.x.x代替 2. 配置 【服务器端】 S_NUMBER是一个端口号 #服务端口 bind_port S_NUMBER #监听地址 bind_addr 0.0.0.0 #认证token token xxxx【客户端】(也就是需要被内网穿透的服务器) C_NUMBER是一个端口号 …

内网穿透神器Frps一键安装脚本及设置教程

frps 是一个高性能的反向代理应用,可以帮助您轻松地进行内网穿透,对外网提供服务,支持 tcp, http, https 等协议类型,并且 web 服务支持根据域名进行路由转发。 *因为frps是go语言写的,所以在路由器上使用的时候&#…

frpc和frps 内网穿透越狱插件

内网穿透、frp、frpc、frps https://zhaoboy9692.github.io/repo 越狱源 https://zhaoboy9692.github.io/repo 苦于在ios越狱下没有frp穿透使用 特地开发了的越狱插件 基于最新frp0.48编译 ios14.6测试没问题 有问题及时反馈

使用frps和frpc实现内网穿透

内网穿透的作用包括跨网段访问一个局域网中的一台主机。 如上图,假设我们想要通过主机A访问主机C,但是主机A和主机C绑定的都是私有ip地址,所以它们之间是无法直接进行通信的。要想使得A和C能够进行通信,就需要用到内网穿透的技术。…

frp服务端(frps) 安装及使用

FRP官方文档 https://gofrp.org/docs/ 服务端安装 环境 ubuntu 22.04 下载 Github 的 Release 中下载到最新版本的客户端和服务端二进制文件 可以指定你的目录,这里用 /usr/local/frp cd /usr/local/frp wget https://github.com/fatedier/frp/releases/dow…

CentOS Frp内网穿透:Frps+Nginx反向代理

目录 服务器使用配置 一、Nginx安装 二、Frps安装 三、frpc安装 服务器使用配置 CentOS 7.6 CPU: 2核 内存: 4GB 一、Nginx安装 参考《Centos配置Nginxtomcat》,这里就不做过多阐述 二、Frps安装 这里使用的是阿里源 #下载脚本 wget https://code.aliyun.com…

nginx反向代理frps frpc穿透

frps 和 nginx 在同一台机器,假设ip192.168.166.17 1. frps服务器端配置 测试时,frps服务器跟nginx在同一台机器(192.168.166.17),理论上可以不在同一台机器,nginx可以代理http请求,发给frps服务端。 frps.ini # fr…

利用空闲服务器搭建frps服务端-实现穿透代理

利用frps代理Tcp或者udp或其它类型的连接 1、什么是frps/frpc frps是代理的服务端、frpc是代理的客户端,使用方数据传输到服务端,服务端再将数据传输到提供方,从而达到相互访问的目的。 2、什么是穿透 穿透就是客户端A和客户端B都没有公网…

利用frps进行内网穿透

这里使用的是传统穿透方法,需要一个有公网ip的中转节点去告知 看最下面,用最新版的frps 1、注意 服务器和客户机之间的数据传输全部经过中转服务器,传输速度将受制于中转服务器的上下行带宽。 2、穿透原理 其实就是客户端A绑定端口发送数…