ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步

article/2025/10/3 11:23:29

前言:
①canal.adapter-1.1.5 支持一对一单表的增量数据同步ElasticSearch 7;
②对于多表聚合场景的SQL满足不了我们的业务需求。
③采用java canal 接入,可以实现灵活化增量数据准实时同步

文章目录

          • 一、java canal 接入
            • 1. 依赖导入
            • 2. 增加配置
            • 3. canal 客户端
            • 4. 消息消费/处理模型
            • 5. 重建关联索引
          • 二、效果验证
            • 2.1. 关闭adapter
            • 2.2. 修改数据
            • 2.3. 数据监控
            • 2.4. 索引查询
            • 2.5. 关联数据修改
            • 2.6. 数据监控
            • 2.7. 索引查询

一、java canal 接入

前提:由于咱们是做增量数据同步ElasticSearch 7.15.2,因此项目中需要提前整合好ElasticSearch 7.15.2

1. 依赖导入
 <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.common --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.common</artifactId><version>1.1.5</version></dependency>
2. 增加配置

application.properties

# canal服务端ip
canal.alone-ip=192.168.159.134
# destination
canal.destination=example
canal.username=canal
canal.passwoed=canal
canal.port=11111
3. canal 客户端
package com.imooc.dianping.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;/*** canal 客户端** @author gblfy* @date 2021-11-22*/
@Component
public class CanalClient implements DisposableBean {private CanalConnector canalConnector;@Value("${canal.alone-ip}")private String CANALIP;@Value("${canal.destination}")private String DESTINATION;@Value("${canal.username}")private String USERNAME;@Value("${canal.passwoed}")private String PASSWOED;@Value("${canal.port}")private int PORT;@Beanpublic CanalConnector getCanalConnector() {//canal实例化canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(CANALIP, PORT)), DESTINATION, USERNAME, PASSWOED);//连接canalcanalConnector.connect();// 自定filter 格式{database}.{table}canalConnector.subscribe();// 回滚寻找上次中断的位置canalConnector.rollback();//返回连接return canalConnector;}@Overridepublic void destroy() throws Exception {if (canalConnector != null) {//防止canal泄露canalConnector.disconnect();}}
}

客户端有了,接下来,解决消息消费的问题?
消息消费的过程,就是轮训跑批的过程。可以简单理解为我们对应消息的消费,类似于canal客户端不断地从canal.deployer当中不断拉取mysql数据库中bin_log同步过来的消息,而消息消费完成之后,去告知canal.deployer,这条消息已经ack确认消费过了,之后,就不用推送给我了。使用这种方式,来完成消息消费的动作。

4. 消息消费/处理模型

接入消息消费模型CanalScheduling

package com.imooc.dianping.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.imooc.dianping.dal.ShopModelMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** canal 消息消费模型** @author gblfy* @date 2021-11-22*/
@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {//记录日志private final static Logger logger = LoggerFactory.getLogger(CanalScheduling.class);private ApplicationContext applicationContext;@Autowiredprivate ShopModelMapper shopModelMapper;@Resourceprivate CanalConnector canalConnector;@Autowiredprivate RestHighLevelClient restHighLevelClient;@Override//每个100毫秒 唤醒线程执行run方法@Scheduled(fixedDelay = 100)public void run() {//初始化批次IDlong batchId = -1;try {//批次/1000条int batchSize = 1000;Message message = canalConnector.getWithoutAck(batchSize);//1000条批次的ID  当获取的batchId=-1代表没有消息batchId = message.getId();List<CanalEntry.Entry> entries = message.getEntries();// batchId != -1 (内部消费有消息的)// entries.size() > 0有对应的内容//当batchId != -1 并且entries.size() > 0说明mysql bin_log发生了多少条数据的变化if (batchId != -1 && entries.size() > 0) {//逐条处理entries.forEach(entry -> {//处理类型: bin_log已ROW方式处理的消息if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {//消息解析处理publishCanalEvent(entry);}});}//消息解析完毕后,告知批次消息已经消费ack确认完成canalConnector.ack(batchId);} catch (Exception e) {e.printStackTrace();//将本次消息回滚,下次继续消息canalConnector.rollback(batchId);}}/*** 消息解析处理函数** @param entry*/private void publishCanalEvent(CanalEntry.Entry entry) {//事件类型 只关注INSERT、UPDATE、DELETECanalEntry.EventType eventType = entry.getHeader().getEventType();//获取发生变化的数据库String database = entry.getHeader().getSchemaName();//获取发生变化的数据库中的表String table = entry.getHeader().getTableName();CanalEntry.RowChange change = null;try {//记录这条消息发生了那些变化change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();return;}change.getRowDatasList().forEach(rowData -> {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();//主键String primaryKey = "id";CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()&& primaryKey.equals(column.getName())).findFirst().orElse(null);//将Columns转换成mapMap<String, Object> dataMap = parseColumnsToMap(columns);try {indexES(dataMap, database, table);} catch (IOException e) {e.printStackTrace();}});}/*** 将Columns转换成map** @param columns* @return*/Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {Map<String, Object> jsonMap = new HashMap<>();columns.forEach(column -> {if (column == null) {return;}jsonMap.put(column.getName(), column.getValue());});return jsonMap;}private void indexES(Map<String, Object> dataMap, String database, String table) throws IOException {logger.info("发生变化的行数据:{} ", dataMap);logger.info("发生变化的数据库:{}  ", database);logger.info("发生变化的表:{} ", table);// 限定处理出具库范围 支处理数据库名称为dianpingdb的消息if (!StringUtils.equals("dianpingdb", database)) {return;}/*** 我们要关注表数据范围* 当seller表、category表、shop 表发生变化,都buildESQuery* 当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引*/List<Map<String, Object>> result = new ArrayList<>();if (StringUtils.equals("seller", table)) {result = shopModelMapper.buildESQuery(new Integer((String) dataMap.get("id")), null, null);} else if (StringUtils.equals("category", table)) {result = shopModelMapper.buildESQuery(null, new Integer((String) dataMap.get("id")), null);} else if (StringUtils.equals("shop", table)) {result = shopModelMapper.buildESQuery(null, null, new Integer((String) dataMap.get("id")));} else {//不关注其他的表return;}for (Map<String, Object> map : result) {IndexRequest indexRequest = new IndexRequest("shop");indexRequest.id(String.valueOf(map.get("id")));indexRequest.source(map);//更新索引restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}
5. 重建关联索引
package com.imooc.dianping.dal;import com.imooc.dianping.model.ShopModel;
import org.apache.ibatis.annotations.Param;import java.math.BigDecimal;
import java.util.List;
import java.util.Map;public interface ShopModelMapper {//当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引List<Map<String,Object>> buildESQuery(@Param("sellerId")Integer sellerId,@Param("categoryId")Integer categoryId,@Param("shopId")Integer shopId);
}
    <select id="buildESQuery" resultType="java.util.Map">select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flagfrom shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id<if test="sellerId != null">and c.id = #{sellerId}</if><if test="categoryId != null">and b.id = #{categoryId}</if><if test="shopId != null">and a.id = #{shopId}</if></select>
二、效果验证
2.1. 关闭adapter
cd /app/canal/canal.adapterbin/stop.sh
2.2. 修改数据

修改dianpingdb数据库中shop表中ID=1的数据中name的值
陕西面馆(北京亦庄) 调整为gblfy.com陕西面馆(北京亦庄),提交事务!
在这里插入图片描述

2.3. 数据监控

在这里插入图片描述

2021-11-23 16:19:21.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/shopcover/xchg.jpg, address=船厂路36号, latitude=31.195341, end_time=22:00, created_at=2021-11-19 15:53:52, tags=新开业 人气爆棚, start_time=10:00, updated_at=2021-12-22 15:53:52, category_id=1, name=gblfy.com陕西面馆(北京亦庄), remark_score=4.9, price_per_man=156, id=1, seller_id=1, longitude=120.915855} 
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:shop 
2.4. 索引查询

# 查询shop索引
GET /shop/_search
{"query": {"match": {"name": "陕西面馆"}}
}

在这里插入图片描述

2.5. 关联数据修改

单表增量同步,官网本身就支持,关联表数据修改,增量准实时同步,官网是不支持的。因此,咱们需要继续测试修改关联表的数据,再次验证。

修改dianpingdb数据库中category表中ID=1的数据中name的值
美食5 调整为美食我的最爱666,提交事务!

修改前:
在这里插入图片描述
修改后:
在这里插入图片描述

2.6. 数据监控

在这里插入图片描述

2021-11-23 16:25:16.325  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/firstpage/food_u.png, updated_at=2019-06-10 15:33:37, name=美食我的最爱666, created_at=2019-06-10 15:33:37, id=1, sort=99} 
2021-11-23 16:25:16.334  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:25:16.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:category
2.7. 索引查询
GET /shop/_search
{"query": {"term": {"category_name": "美食我的最爱666"}}
}

在这里插入图片描述


# 查询shop索引
GET /shop/_search
{"query": {"match": {"name": "陕西面馆"}}
}

在这里插入图片描述
至此,我们完成了通过java代码的方式,灵活化的根据ID去接入我们对应es增量的准实时更新!小伙伴们,一起加油!


http://chatgpt.dhexx.cn/article/HlnpglBG.shtml

相关文章

浅入浅出keepalived+mysql实现高可用双机热备

当数据库发生宕机的情况,如果配置了数据库主从同步模式或主主同步模式,则可以从从库中获取数据。 当数据库发生宕机的情况,要求应用系统实现高可用,应用系统不会受到影响,需要对mysql进行双机热备实现数据库的高可用。 实现双机热备的方式有Mycat组件方式、canel组件方式、…

【硬刚大数据】大数据同步工具之FlinkCDC/Canal/Debezium对比

欢迎关注博客主页&#xff1a;微信搜&#xff1a;import_bigdata&#xff0c;大数据领域硬核原创作者_王知无(import_bigdata)_CSDN博客 欢迎点赞、收藏、留言 &#xff0c;欢迎留言交流&#xff01;本文由【王知无】原创&#xff0c;首发于 CSDN博客&#xff01;本文首发CSDN论…

4、Canal的 instance模块

一、总体结构 如确所述&#xff0c;Canal server的模式有两种&#xff1a;manager和spring 在core模块中&#xff0c;定义了CanalInstance接口&#xff0c;以及其抽象类子类AbstractCanalInstance。 在spring模块&#xff0c;提供了基于spring配置方式的CanalInstanceWithSpr…

canal实现mysql数据同步

前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案例演示下利用canal实现mysql数据同步的过程 同步原理 如上图所示,为canal同步mysql数据的原理的简单示…

SpringBoot系列之canal和kafka实现异步实时更新

SpringBoot系列之canal和kafka实现异步实时更新 实验开发环境 JDK 1.8SpringBoot2.2.1Maven 3.2+开发工具 IntelliJ IDEAsmartGit1、什么是阿里canal? canal是阿里开源的, 对数据库增量日志解析,提供增量数据订阅和消费的组件。引用官网的图片,canal的工作原理主要是模拟…

Java:SpringBoot整合Canal+RabbitMQ组合实现MySQL数据监听

canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 目录 一、MySQL设置二、启动Canal服务端三、通过Canal客户端消费数据四、通过RabbitMQ消费数据1、启动RabbitMQ2、修改canal配置3、消…

Kubernetes 学习19基于canel的网络策略

一、概述 1、我们说过&#xff0c;k8s的可用插件有很多&#xff0c;除了flannel之外&#xff0c;还有一个流行的叫做calico的组件&#xff0c;不过calico在很多项目中都会有这个名字被应用&#xff0c;所以他们把自己称为project calico&#xff0c;但是很多时候我们在kubernet…

基于canel的网络策略

基于canel的网络策略 canel工作的默认网段是192.168.0.0/16的网段 官网&#xff1a;https://docs.projectcalico.org/v3.2/introduction/ 参考地址&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 分别运行&#xff1a;…

REDIS11_缓存和数据库一致性如何保证、解决方案、提供Canel解决数据一致性问题

文章目录 ①. 缓存和数据库双写一致保证②. 缓存数据一致性-解决方案③. 缓存数据一致性-解决-Canal ①. 缓存和数据库双写一致保证 ①. 只要用缓存,就可能会涉及到缓存与数据库双存储双写,你只要是双写,就一定会有数据一致性的问题 ②. 那么,如何解决一致性问题&#xff1f;提…

binlog+canel windows服务器下mysql数据的部署过程

1.开启mysql的binlog功能&#xff0c;并配置binlog模式为row。 1.1windos下修改mysql的my.ini文件&#xff08;该文件在C:\ProgramData\MySQL\MySQL Server 5.7&#xff0c;而不是C:\Program Files\MySQL\MySQL Server 5.7下的my-default.ini文件&#xff0c;C:\Pr…

十九、基于canel的网络策略

说明 Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设…

canel-1.1.5 canal.deployer安装

简介 canal [kə’nl]&#xff0c;译意为水道/管道/沟渠&#xff0c;主要用途是基于 MySQL 数据库增量日志解析&#xff0c;提供增量数据订阅和消费 工作原理 canal 模拟 MySQL slave 的交互协议&#xff0c;伪装自己为 MySQL slave &#xff0c;向 MySQL master 发送 dump 协…

Kubernetes基于canel的网络策略

Calico的官方地址&#xff1a;https://docs.projectcalico.org/v3.8/getting-started/kubernetes/installation/flannel 安装canel之前需要注意 如果您使用的是pod CIDR 10.244.0.0/16&#xff0c;请跳至下一步。如果您使用的是其他pod CIDR&#xff0c;请使用以下命令设置一个…

binlog以及Canel

一、NDB存储引擎 在介绍binlog之前,需要先了解一下NDB NDB 存储引擎也叫NDB Cluster 存储引擎,主要用于MySQL Cluster 分布式集群环境。 NDB特点: • 分布式:分布式存储引擎,可以由多个NDBCluster存储引擎组成集群分别存放整体数据的一部分 • 支持事务:和Innodb一样…

canel的网络策略

资源&#xff1a; https://docs.projectcalico.org/v3.2/getting-started/kubernetes/installation/flannel 基于pod Egress 是Pod作为客户端(需要定义目标端口和目标地址) ingress 是pod作为服务端(需要定义目标地址和pod自己的端口) canel工作的默认网段是192.168.0.0/16的网…

超详细的Canal入门,看这篇就够了!

思维导图 文章已收录Github精选&#xff0c;欢迎Star&#xff1a;https://github.com/yehongzhi/learningSummary 前言 我们都知道一个系统最重要的是数据&#xff0c;数据是保存在数据库里。但是很多时候不单止要保存在数据库中&#xff0c;还要同步保存到Elastic Search、HB…

数据同步神器Canel-day01

背景 关于数据同步的方式有很多种&#xff0c;现在有一个场景需要将mysql数据库的数据主动同步到我们的工程中&#xff0c;并且能再mysql数据库客户端更改某一行的数据也能将数据同步到另一个数据库或者工程中&#xff0c;对于这种场景的使用我们应该怎么去实现呢&#xff1f;…

Canal使用流程、部署安装文档

文章目录 背景信息使用流程步骤一&#xff1a;准备MySQL数据源步骤二&#xff1a;创建索引步骤三&#xff1a;安装并启动Canal-server(Deployer)步骤四&#xff1a;部署Client-Adapter步骤五&#xff1a;验证增量数据同步步骤六&#xff1a;同步MySQL全量数据&#xff08;如果不…

Canel-简介使用

简介 canal 的工作原理 MySQL 主从复制过程 ➢ Master 主库将改变记录&#xff0c;写到二进制日志(binary log)中 ➢ Slave 从库向 mysql master 发送 dump 协议&#xff0c;将 master 主库的 binary log events 拷贝到它的中继日志(relay log)&#xff1b; ➢ Slave 从…

检查页面Session是否过期,过期执行相应操作 解决方法

how to check session is expired or not if expired then redirect to login page 在项目中&#xff0c;如果客户打开页面时间过久容易导致页面Session过期&#xff0c;再进行任何操作时都会提示“Asp.Net session has expired”&#xff0c;这样毕竟都用户不太友好&#xff0…