数据库与缓存一致性解决方案

article/2025/8/27 19:20:43

数据库与缓存一致性解决方案

文章目录

  • 数据库与缓存一致性解决方案
    • 前言
    • 几种方案的分析
    • 方案的实现

前言

项目中如果用到了缓存,就会涉及到数据库与缓存的双写,由于这两个操作不是原子性的,在并发的场景下,容易产生数据库与缓存不一致的情况。

几种方案的分析

数据库与缓存的双写有很多种方案,我们先来看几种最常见的:

1. 先更新数据库再更新缓存

这种方案最容易想到,但是也很容易出问题,比如写请求A先更新了数据库,这时候,写请求B也更新了数据库,接着又更新了缓存,最后写请求A又更新了一次缓存,这个时候缓存中就出现了脏数据。

在这里插入图片描述

2. 先更新缓存再更新数据库

假如缓存更新成功,数据库更新失败,那么肯定会照成数据不一致。

3. 先删除缓存再更新数据库

写请求A进行写操作,删除缓存,读请求B查询缓存发现不存在,B去数据库查询得到旧值然后写入缓存,最后写请求A才将新值写入数据库,这个时候缓存中就是脏数据。

由于对数据库的读一般比写要快,所以这种情况是比较容易发生的。

在这里插入图片描述

4. 先更新数据库再删除缓存

读请求A查询数据库,得到一个旧值,写请求B将新值写入数据库,写请求B删除缓存,请求A将查询到的旧值写入数据库,这个时候缓存中就出现了脏数据。

但是这种情况发生的概率比较低,因为数据库的读操作一般比写操作快,所以操作1完成之后,马上就会进行操作4。所以最推荐就是这种方式。
在这里插入图片描述

删缓存还是写缓存?

现在我们发现,对缓存的操作有两种,一种是更新缓存,一种是删除缓存。其实一般采取的是删缓存,原因有两点:

  1. 并发环境下,无论是先操作数据库还是后操作数据库而言,如果加上更新缓存,那就更加容易导致数据库与缓存数据不一致问题。(删除缓存直接和简单很多)。
  2. 如果每次更新了数据库,都要更新缓存(这里指的是写多读少的场景),倒不如直接删除掉。等再次读取时,缓存里没有,就去数据库找,在数据库找到再写到缓存里边(体现懒加载)。

删除缓存失败

明确了删缓存的方案之后,现在面临最大的问题就是缓存删除失败了该怎么办,如果删除失败了就一定会出现不一致的情况,在这里,其实可以做一个保障删除缓存失败后重试的机制,请看方案5。

5. 订阅MySql的binlog日志,发送到消息队列再做删除

首先更新数据库的数据,数据库会将数据表数据的变更信息写入binlog日志中,监听到日志文件的变化后,把数据库变更信息发送到消息队列中,程序接收到消息队列中的数据,对缓存做删除。如果删除失败了,程序就把数据再次发送到消息队列中,再做一次删除,实现删除失败后的重试。

这种方案还有一种好处就是不会对业务代码造成过多的侵入,我们可以专门起一条协程来监听消息队列,如果收到消息队列中的数据,直接去删除对应的缓存即可,而不必在业务代码中去写。

在这里插入图片描述

方案的实现

下面我们来实现一下刚刚列举的最后一种方案:

1.mysql的配置

mysql需要开启binlog,首先查看一下mysql是否开启了binlog:

# 如果log_bin的值为OFF是未开启,为ON是已开启
SHOW VARIABLES LIKE '%log_bin%'

如果未开启的话,可以修改一下/etc/my.cnf

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

配置好之后重启一下mysql。

接着创建用于同步的mysql账号:

mysql -uroot -p password
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.rabbitmq的配置

我们把canal订阅的binlog更新信息发送到rabbitmq中,再由程序去读取。为了方便,将rabbitmq安装在docker中。

首先创建一个目录用于与容器中的rabbitmq配置文件形成映射:

mkdir /opt/module/rabbitmq/data -p

接着在docker中运行rabbitmq镜像:

# 5672是rabbitmq 默认TCP监听端口,到时候程序连接的也是这个端口
# 15672是rabbitmq提供的ui管理界面的端口
# 25672是rabbitmq集群之间通信的端口
# 如果docker跑在云服务器上,记得在安全组中开放5672和15672端口
docker run -d --hostname rabbit-svr --name rabbit -p 5672:5672 -p 15672:15672 -p 25672:25672 -v /opt/module/rabbitmq/data:/var/lib/rabbitmq rabbitmq:management

然后就可以在浏览器中访问rabbitmq的ui控制界面了,默认账号和密码都是guest

在这里插入图片描述
我们先创建一个exchange,类型选择fanout:

在这里插入图片描述
接着创建一个queue用于监听exchang中的消息,创建好queue之后需要点进去绑定一下刚刚创建的canal.deleteCache

在这里插入图片描述
在这里插入图片描述

3.canal配置

canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。

它可以订阅mysql的binlog日志,然后将更新的数据发送到消息队列中。

下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz

由于这个工具是java开发的,所以我们还需要在linux环境下配置一下java环境,我配置的是jdk1.8。

首先将jdk目录放在/usr/local/java/目录下,接着编辑/etc/profile文件配置一下环境变量:

export JAVA_HOME=/usr/local/java/jdk1.8.0_181
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH

执行一下这个配置脚本:source /etc/profile

执行echo $JAVA_HOME命令,可以看到jdk文件的路径:

在这里插入图片描述
自此,java的环境就配好了,接着来配置一下 canal

先将.tar.gz文件解压到/opt/module/canal/目录下,首先来编辑conf/目录下的canal.properties文件:

canal.serverMode = rabbitMQrabbitmq.host = 127.0.0.1
rabbitmq.virtual.host = /
# rabbitmq 中新建的 Exchange
rabbitmq.exchange = canal.deleteCache
rabbitmq.username = guest
rabbitmq.password = guest

接着编辑conf/example/目录下的instance.properties文件:

canal.instance.master.address=127.0.0.1:3306# mysql中配置的用于同步的canal用户
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal# rabbitmq中配置的 绑定的 routingkey,如果是topic模式就需要配置,fanout模式不用配置
# canal.mq.topic=test.routingKey# 指定要订阅哪个库下哪个表的更新记录,默认是今天所有库
# canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=cloud-disk.user_repository

在启动canal之前,最好把mysql的binlog文件清理一下,不然可能会出现匹配不到当前位置的错误:

mysql -uroot -p password
# 该命令会删除所有binlog
RESET MASTER;
# 删除mysql-bin.010之前所有日志
PURGE MASTER LOGS TO 'mysql-bin.010';
# 删除2003-04-02 22:46:26之前产生的所有日志
PURGE MASTER LOGS BEFORE '2003-04-02 22:46:26';

最后启动bin目录下的startup.sh脚本,查看logs/canal目录下的的canal.log,出现以下内容说明启动成功:

在这里插入图片描述

至此,环境准备完成。

4.测试数据

Go语言可以使用github.com/streadway/amqp库来操作rabbitmq,执行以下命令来安装:

go get github.com/streadway/amqp

然后我们对这个库做一个二次封装方便使用,将以下程序写到mq/rabbitmq.go中:

package mqimport ("encoding/json""github.com/streadway/amqp"
)// RabbitMQ RabbitMQ结构图
type RabbitMQ struct {channel  *amqp.ChannelName     stringexchange string
}// New 连接RabbitMQ服务,声明一个消息队列
func New(s, name string) *RabbitMQ {conn, e := amqp.Dial(s)if e != nil {panic(e)}ch, e := conn.Channel()if e != nil {panic(e)}q, e := ch.QueueDeclare(name,false,true,false,false,nil)if e != nil {panic(e)}mq := new(RabbitMQ)mq.channel = chmq.Name = q.Namereturn mq
}// Bind 消息队列绑定交换机
func (q *RabbitMQ) Bind(exchange, key string) {e := q.channel.QueueBind(q.Name,key,exchange,false,nil)if e != nil {panic(e)}q.exchange = exchange
}// Send 向消息队列发布消息
func (q *RabbitMQ) Send(queue string, body interface{}) {str, e := json.Marshal(body)if e != nil {panic(e)}e = q.channel.Publish("",queue,false,false,amqp.Publishing{ReplyTo: q.Name,Body:    []byte(str),})if e != nil {panic(e)}
}// Publish 向交换机发送消息
func (q *RabbitMQ) Publish(excahnge string, body interface{}) {str, e := json.Marshal(body)if e != nil {panic(e)}e = q.channel.Publish(excahnge,"",false,false,amqp.Publishing{ReplyTo: q.Name,Body:    []byte(str),})if e != nil {panic(e)}
}// Consume 消费消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {c, e := q.channel.Consume(q.Name,"",true,false,false,false,nil)if e != nil {panic(e)}return c
}// Close 关闭连接
func (q *RabbitMQ) Close() {q.channel.Close()
}

然后创建一个WatchBinLog函数来获取mq中的数据,一旦监听到数据,就可以对缓存进行删除,如果缓存删除失败,就再次向绑定的exchange中发送binlog的更新信息,实现删除重试:

func WatchBinLog(conf config.Config) {q := New(config.Conf.RabbitMQ.RabbitURL, "queue.deleteCache")defer q.Close()q.Bind(config.Conf.RabbitMQ.CanalExchange, "")c := q.Consume()cacheDB := models.InitCacheDB(conf)conn := cacheDB.RedisPool.Get()for msg := range c {payload := getPayload(msg.Body)var err error// 监听到了mq发送过来的binlog变动,删除缓存_, err = conn.Do("HDEL", payload.UserIdentity, payload.ParentId+"file")_, err = conn.Do("HDEL", payload.UserIdentity, payload.ParentId+"folder")// 如果失败,往mq中重新发送if err != nil {logx.Error("删除缓存失败, payload: ", payload)retryMq := New(config.Conf.RabbitMQ.RabbitURL, "")retryMq.Publish(config.Conf.RabbitMQ.CanalExchange, string(msg.Body))}}
}

最后在main函数中起一个协程来运行WatchBinLog函数,不对业务代码进行侵入:

go mq.WatchBinLog(config.Conf)

程序启动之后,改变数据库中的数据,就会发现缓存会被自动删除。


http://chatgpt.dhexx.cn/article/ymrIuN8E.shtml

相关文章

数据库实时同步技术解决方案_两个数据库同步_数据库双向同步方案

SyncNavigator v8.6.2 SyncNavigator是一款功能强大的数据库同步软件,适用于SQL SERVER, MySQL,具有自动/定时同步数据、无人值守、故障自动恢复、同构/异构数据库同步、断点续传和增量同步等功能,支持Windows xp以上所有操作系统,适用于大容量数据库快速同步。 安装包下…

MySQL数据库主从双向同步

MySQL数据库主从双向同步 一 环境二 主从同步1.主服务器配置2.从服务器配置 二 双向同步三 其他相关参数配置四 后记 一 环境 由于开发需要&#xff0c;需要两台服务器进行负载均衡&#xff0c;两台服务器配置完全相同&#xff0c;均为windows server 2012,且MySQL数据库版本为…

容灾解决方案介绍

1.容灾需求 2.容灾挑战 3. 什么是HA 4.什么是容灾 5.HA和DR的关系 6.容灾和备份的区别 7.衡量容灾系统的主要指标 8.容灾系统的级别 9.灾备系统建设的国际标准 10.华为业务连续性灾备解决方案全景图 11.容灾备份解决方案框架 12.容灾设计模式-同步、异步相结合 13.主备容灾方…

数据库容灾方案

**数据库容灾方案** 场景一 “阵列硬盘坏了&#xff0c;数据库读写文件异常&#xff0c;系统停运啦&#xff01;”场景二 “不小心执行了TRUNCATE操作&#xff0c;核心业务表被清空&#xff0c;系统关键功能无法使用&#xff01;”场景三 “在硬件投入变化不大的情况下&#xf…

MySql数据库从库同步的延迟问题及解决方案

1)、MySQL数据库主从同步延迟原理mysql主从同步原理&#xff1a; 主库针对写操作&#xff0c;顺序写binlog&#xff0c;从库单线程去主库顺序读”写操作的binlog”&#xff0c;从库取到binlog在本地原样执行&#xff08;随机写&#xff09;&#xff0c;来保证主从数据逻辑上一致…

数据同步解决方案-canal

1、canal简介 canal可以用来监控数据库数据的变化&#xff0c;从而获得新增数据&#xff0c;或者修改的数据。 canal是应对阿里巴巴存在杭州和美国的双机房部署&#xff0c;存在跨机房同步的业务需求而提出的。 阿里系公司开始逐步的尝试基于数据库的日志解析&#xff0c;获…

数据库灾备解决方案

文章目录 行业背景解决方案优势核心产品灾备设计及技术指标DBS冷备热备份同城容灾同城容灾——复制加高可用同城容灾——A-S同城容灾——A-A更多信息 行业背景 数据是企业重要的生产资料&#xff0c;关键数据的丢失可能会给企业致命一击&#xff0c;因为数据是计算机系统存在的…

【解决方案 三十一】Navicat数据库结构同步

最近在开发过程中遇到一个问题&#xff1a;刚经历了一个两个月的长迭代&#xff0c;导致测试库已经被改的面目全非了&#xff0c;最关键的是所有的变更语句都没有记录&#xff0c;这上线的时候怎么办啊&#xff0c;一百多张表呢&#xff0c;幸好组里的老程序猿说用过一个工具&a…

Redis与数据库数据同步解决方案

本文转自&#xff1a;http://3gods.com/2016/06/23/Redis-Sync-DB.html 部分图片来自&#xff1a;http://blog.csdn.net/stubborn_cow/article/details/50586990 数据库同步到Redis 我们大多倾向于使用这种方式&#xff0c;也就是将数据库中的变化同步到Redis&#xff0c;这种…

跨数据库同步方案汇总

Datax 一般比较适合于全量数据同步&#xff0c;对全量数据同步效率很高&#xff08;任务可以拆分&#xff0c;并发同步&#xff0c;所以效率高&#xff09;&#xff0c;对于增量数据同步支持的不太好&#xff08;可以依靠时间戳定时调度来实现&#xff0c;但是不能做到实时&…

唯一性约束(unique)

在字段类型后面加一个unique&#xff0c;表示这个字段的值是唯一的&#xff0c;不能够有相同的值出现。 给某一列添加unique 给两个字段加唯一性约束&#xff0c;不同的方式的约束范围不同 联合加约束时只有两个字段的值都是一样的才会出错 单独加约束时&#xff0c;只要有一个…

ORA-00001: 违反唯一约束条件 的解决办法

最近在做项目是测试的时候出现了这种错误&#xff1a;ORA-00001: 违反唯一约束条件 其实说白了这种错误就是 数据库的ID值&#xff0c;小于他的next number 由于表的ID是唯一的&#xff0c;所以用 select max(id) from test 查的该ID大于Sequences里面的开始ID&#xff0c;所以…

ORA-00001: 违反唯一约束条件 -- 解决方法

1、错误 Caused by: java.sql.BatchUpdateException:ORA-00001: 违反唯一约束条件 (SOLEX.SYS_C0012537) 2、分析 ①通过SYS_C0012537找到对应的表 select a.constraint_name,a.constraint_type,b.column_name,b.table_name from user_constraints a inner join user_cons_col…

ORA-00001: 违反唯一约束条件解决方案(oracle 00001)

原因 主要原因是在进行插入INSERT或者更新UPDATE的时候&#xff0c;违反唯一约束&#xff0c;导致操作某行数据时出现重复的值。 解决方案 删除或者更改唯一约束。 修改你使用的插入INSERT或者更新UPDATE的语句。 如果当有多个表进行操作时&#xff0c;你不知道哪个表出现…

ORA-00001: 违反唯一约束条件(SOLEX.SYS_C0012537) --解决方法

1、错误 Caused by: java.sql.BatchUpdateException:ORA-00001: 违反唯一约束条件 (SOLEX.SYS_C0012537) 2、分析 ①通过SYS_C0012537找到对应的表 select a.constraint_name,a.constraint_type,b.column_name,b.table_name from user_constraints a inner join user_cons_colu…

SQL Server唯一约束

SQL Server唯一约束 SQL Server UNIQUE约束简介 SQL Server UNIQUE 约束用于确保存储在列或列组中的数据在表中的行中是唯一的。 以下语句创建一个表&#xff1a; hr.persons &#xff0c;其 email 邮件列中的数据在表的行中是唯一的&#xff1a; 在此语法中&#xff0c;将 U…

MYSQL 唯一约束

一、唯一约束&#xff08;UNIQUE&#xff09; 唯一约束用来保证一列&#xff08;或一组列&#xff09;中的数据是唯一的。类似与主键&#xff0c;但是有以下区别&#xff1a; 表可包含多个唯一约束&#xff0c;但每个表只允许一个主键。 唯一约束列可包含 NULL 值。唯一约束列…

oracle ORA-00001: 违反唯一约束条件

在一个平平常常的工作日&#xff0c;被这玩意坑了一下。 在一个日常删除插入的时候居然报了 违反唯一约束条件 的错&#xff0c;打断点也没发现问题&#xff0c;一路运行下去都没错&#xff0c;但是就是在方法的最后一个括号&#xff0c;报错了&#xff0c;搞的我一脸懵逼。 回…

ORACLE updata是提示违反唯一约束条件

如图所示&#xff0c;使用updata语句时提示违反唯一约束条件&#xff1a; 语句如下&#xff1a; update table_name set param1‘xxx’&#xff0c;param2‘xxx’ where param4‘xxx’; 解决办法&#xff1a; 一、能用Navicat for Oracle或者sqlplus连接oracle 这里就不用说…

ORACLE 数据库插入数据违反唯一性约束条件,如何删除约束条件

在同一个oracle 数据库中新建了多个不同的表空间&#xff0c;共享了同一个表空间结构&#xff0c;导致不同表空间产生了同名的约束条件&#xff0c;导致在插入数据的时候报错 违反唯一性约束条件 1.第一步在数据库的客户端查找报错的唯一性约束条件 select * from dba_const…