Zookeeper分布式锁

article/2025/10/13 6:15:54

实现一把分布式锁通常有很多方法,比较常见的有 Redis 和 Zookeeper。

Redis分布式锁可参考之前的文章:

  • Redisson 分布式锁原理分析:https://blog.csdn.net/qq_42402854/article/details/123342331

Zookeeper能实现分布式锁,是因为它有一个特性,就是多个线程去 Zookeeper 里面去创建同一个节点的时候,只会有一个线程执行成功。

锁可理解为 ZooKeeper 上的一个节点,

  • 当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
  • 当释放锁时,只需要将会话关闭,临时节点就删除了,即释放了锁。如果万一客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点也会自动删除掉。

Zookeeper 的分布式锁实现: 实现有两种方案:

  1. 基于临时节点实现,会产生羊群效应,不推荐。
  2. 基于临时顺序节点实现。

一般我们认为,ZooKeeper 的分布式锁是基于临时顺序节点,然后通过监听机制来实现的。即方案2。

一、基于临时节点实现

1、实现思路

在这里插入图片描述
锁节点:即锁对象

获取锁:锁请求者加锁,则创建锁节点。

  • 如果创建成功,那么加锁成功,
  • 如果创建失败,那么加锁失败,则等待获取锁(等待获取锁成功的客户端释放锁)。注意:这里锁请求者监听的都是 锁节点的删除操作。

释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。

羊群效应:如果所有的锁请求者都来监听锁持有者,当锁持有者的节点被删除以后,所有的锁请求者都会通知到,即都会同时去竞争锁,但是只有一个锁请求者能拿到锁。这就是羊群效应。

2、实现代码

一般不推荐使用。这里我们自己简单实现一下。使用的是 ZkClient客户端。

2.1 ZkClient客户端连接

public class ZkClientConnectUtils {private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开/*** 使用匿名监听类* @throws Exception*/public static ZkClient zKClientConnnect() throws Exception {ZkClient zkClient = new ZkClient(CONNECT_STR, 3000,60000);TimeUnit.SECONDS.sleep(3);return zkClient;}}

2.2 锁实现

2.2.1 锁接口

/*** 自定义zk分布式锁:定义通用锁接口*/
public interface ZKLock {/*** 加锁*/void lock();/*** 释放锁*/void unlock();
}

2.2.2 抽象锁对象

/*** 抽象锁对象:用到了模板方法设计模式,具体抽象方法由子类实现*/
public abstract class AbstractZKLock implements ZKLock {protected static String path = "/lock";protected ZkClient zkClient = null;public AbstractZKLock() {initClient();}public void initClient(){try {zkClient = createClient();} catch (Exception e) {e.printStackTrace();System.out.println("初始化 zk客户端连接失败,errorMessage=" + e.getMessage());}}/*** 交给子类创建* @return*/protected abstract ZkClient createClient()  throws Exception;/*** lock方法(模板方法设计模式):获取锁的方法*   1.如果锁获取成功,那么业务代码继续往下走。*   2.如果锁获取失败,lock方法需要等待重新获取锁*      2.1等待到了前面那个获取锁的客户端释放锁以后(zk监听机制),*      2.2然后再去重新获取锁*/@Overridepublic void lock() {// 尝试去获取锁if(tryLock()){System.out.println(Thread.currentThread().getName() + "--->获取锁成功!");}else {// 获取失败,在这里等待waitforlock();// 重新获取锁lock();}}@Overridepublic void unlock() {// 因为加锁创建的是临时节点,所以会话关闭,临时节点就删除了,即释放了锁zkClient.close();}/*** 获取锁* @return*/protected abstract boolean tryLock();/*** 获取失败,等待其他释放锁,重新获取锁*/protected abstract void waitforlock();
}

2.2.3 ZkClient客户端实现

/*** 基于ZkClient客户端实现锁:*/
public class ZkClientLock extends AbstractZKLock {private CountDownLatch cdl = null;@Overrideprotected ZkClient createClient() throws Exception{return ZkClientConnectUtils.zKClientConnnect();}/*** 尝试获取锁* @return*/@Overrideprotected boolean tryLock() {try {// 加锁:创建临时节点,创建成功表示加锁成功,否则加锁失败。zookeeper 的特性,节点名不能重复,否则创建失败。if(zkClient.exists(path)) {zkClient.delete(path);}//创建临时节点zkClient.createEphemeral(path);return true;} catch (RuntimeException e) {return false;}}/*** 等待获取锁:* 等前面那个获取锁成功的客户端释放锁* 没有获取到锁的客户端都会走到这里* 1、没有获取到锁的要注册对 path节点的watcher* 2、这个方法需要等待*/@Overrideprotected void waitforlock() {IZkDataListener iZkDataListener = new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}// 一旦 path节点被删除(释放锁)以后,就会触发这个方法@Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 让等待的代码不再等待了// 即 waitforlock方法执行结束,重新去获取锁if (cdl != null) {cdl.countDown();}}};// 注册对 path节点的watcherzkClient.subscribeDataChanges(path, iZkDataListener);// 等待if (zkClient.exists(path)) {cdl = new CountDownLatch(1);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}}// 取消该客户端的订阅关系zkClient.unsubscribeDataChanges(path, iZkDataListener);}
}

2.3 业务代码

public class OrderService {private OrderNumGenereteUtils orderNumFactory = new OrderNumGenereteUtils();// ZkClient分布式锁private ZKLock lock = new ZkClientLock();/*** 创建订单号,模拟业务*/public void createOrderNum() {lock.lock();String orderNum = generateOrderNum();System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]");lock.unlock();}/*** 生成时间格式的订单编号* @return*/private static int orderNum = 0;public String generateOrderNum() {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");return simpleDateFormat.format(new Date()) + ++orderNum;}}

2.4 测试

public class ZkClientLockTest {private static Integer count = 50;private static CountDownLatch cdl = new CountDownLatch(count);public static void main(String[] args) throws InterruptedException {for (int i = 0; i < count; i++) {new Thread(new Runnable() {@Overridepublic void run() {// 模拟50个并发同时去创建订单号。OrderService orderService = new OrderService();try {//线程运行起来时先等待。cdl.await();} catch (InterruptedException e) {e.printStackTrace();}orderService.createOrderNum();}}).start();cdl.countDown();}TimeUnit.MINUTES.sleep(3);}
}

并发模拟 50个线程同时去创建订单号。运行ok。
在这里插入图片描述

二、基于临时顺序节点实现

1、实现思路

在这里插入图片描述
锁节点:即锁对象

获取锁:锁请求者加锁,则创建锁节点。

  • 如果创建成功,那么加锁成功,
  • 如果创建失败,那么加锁失败,则等待它的前一个节点释放锁。注意:这里锁请求者监听的是它上一个节点的删除操作。

释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。

2、示例代码

在实际的开发中,建议直接使用 Curator客户端中的各种官方实现的分布式锁。我们没必要“重复造轮子“”。

Curator客户端提供的 几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁

下面我们以 InterProcessMutex为例。

2.1 Curator客户端连接

前面文章有介绍过 Curator客户端的使用。

public class CuratorClientConnectUtils {private static String CONNECT_STR = "192.168.198.110:2181"; //集群模式下用逗号隔开private static class InnerClass{private static CuratorFramework client = clientConnect2();}public static CuratorFramework getInstance(){return InnerClass.client;}public static void main(String[] args){//CuratorFramework client = clientConnect1();CuratorFramework client = clientConnect2();//启动客户端,必须要有client.start();System.out.println("==CuratorFramework==" + client);}/*** 使用工厂类CuratorFrameworkFactory的静态newClient()方法。* @throws Exception*/public static CuratorFramework clientConnect1() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);//创建客户端实例CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);return client;}/*** 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。* @throws Exception*/public static CuratorFramework clientConnect2() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_STR).sessionTimeoutMs(3000) // 会话超时时间.connectionTimeoutMs(50000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path.build();return client;}public static void closeZKClient() {if (InnerClass.client != null) {InnerClass.client.close();}}
}

2.2 业务代码

public class OrderService {/*** 分布式可重入排它锁*/public InterProcessMutex interProcessMutex;// 简单点,构造方法注入锁实例public OrderService(InterProcessMutex interProcessMutex) {this.interProcessMutex = interProcessMutex;}/*** 创建订单号,模拟业务*/public void createOrderNum() throws Exception {//加锁interProcessMutex.acquire();String orderNum = generateOrderNum();System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]");//释放锁interProcessMutex.release();}/*** 生成时间格式的订单编号* @return*/private static int orderNum = 0;public String generateOrderNum() {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");return simpleDateFormat.format(new Date()) + ++orderNum;}
}

2.3 测试

public class InterProcessMutexTest {private static Integer count = 50;private static CountDownLatch cdl = new CountDownLatch(count);public static void main(String[] args) {CuratorFramework client = CuratorClientConnectUtils.getInstance();client.start();InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/curator/lock");for (int i = 0; i < count; i++) {new Thread(new Runnable() {@Overridepublic void run() {// 模拟50个并发同时去创建订单号。OrderService orderService = new OrderService(interProcessMutex);try {//线程运行起来时先等待。cdl.await();orderService.createOrderNum();} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}}).start();cdl.countDown();}}}

并发模拟 50个线程同时去创建订单号。运行ok。
在这里插入图片描述
注意:我们可以通过锁的构造方法传入path。不传时底层path会有默认值。
在这里插入图片描述

下面简单看一下 Curator客户端实现分布式锁的源码。

3、加锁源码

3.1 查看acquire() -> internalLock()方法
在这里插入图片描述

3.2 查看 attemptLock()方法
在这里插入图片描述
上面 createsTheLock()方法和 internalLockLoop()方法是重点。底层都是 Curator客户端的API使用。

3.2.1 createsTheLock()方法:

级联创建临时有序节点,即获取锁逻辑。返回节点路径。
在这里插入图片描述

3.2.2 internalLockLoop()方法

这个方法实现了 监听的机制,自行查看。

注意:usingWatcher是一次性监听。

client.getData().usingWatcher(watcher).forPath(previousSequencePath);

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{boolean     haveTheLock = false;boolean     doDelete = false;try{if ( revocable.get() != null ){client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){List<String>        children = getSortedChildren();String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slashPredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if ( predicateResults.getsTheLock() ){haveTheLock = true;}else{String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this){try{// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).forPath(previousSequencePath);if ( millisToWait != null ){millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ){doDelete = true;    // timed out - delete our nodebreak;}wait(millisToWait);}else{wait();}}catch ( KeeperException.NoNodeException e ){// it has been deleted (i.e. lock released). Try to acquire again}}}}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);doDelete = true;throw e;}finally{if ( doDelete ){deleteOurPath(ourPath);}}return haveTheLock;}

4、解锁源码

4.1 查看 release()方法
在这里插入图片描述

4.2 查看releaseLock()方法
在这里插入图片描述

– 求知若饥,虚心若愚。


http://chatgpt.dhexx.cn/article/I3gKUTtB.shtml

相关文章

分布式锁

分布式锁实践 在不同进程需要互斥地访问共享资源时&#xff0c;分布式锁是一种非常有用的技术手段。有很多三方库和文章描述如何用Redis实现一个分布式锁管理器&#xff0c;但是这些库实现的方式差别很大&#xff0c;而且很多简单的实现其实只需采用稍微增加一点复杂的设计就可…

分布式系列之分布式锁几种实现机制

在分布式系统中&#xff0c;分布式锁用来解决分布式系统中多线程、多进程在不同机器上共享资源访问的问题。本文简要介绍分布式锁的四种实现机制&#xff0c;包括数据库、Redis缓存、Zookeeper和Etcd&#xff0c;以加深了解。 1、分布式锁介绍 在单体应用中&#xff0c;通过锁…

三种分布式锁

----------本文为学习记录如有错误帮忙指正 一、什么是分布式锁&#xff1f; 在单机系统下&#xff0c;如果多个线程同时访问一个变量或者代码片段就会产生多线程问题。&#xff08;被访问的变量或者代码片段被称之为临界区域&#xff09;这时我们就需要让所有线程按顺序一个一…

Redis实现分布式锁

目录 一、前言 为什么需要分布式锁&#xff1f; 二、基于redis实现分布式锁 为什么redis可以实现分布式锁&#xff1f; 如何实现&#xff1f; 锁的获取 锁的释放 三、如何避免死锁&#xff1f;锁的过期时间如何设置&#xff1f; 避免死锁 锁过期处理 释放其他服务的锁…

什么是分布式锁?几种分布式锁分别是怎么实现的?

一、什么是分布式锁&#xff1a; 1、什么是分布式锁&#xff1a; 分布式锁&#xff0c;即分布式系统中的锁。在单体应用中我们通过锁解决的是控制共享资源访问的问题&#xff0c;而分布式锁&#xff0c;就是解决了分布式系统中控制共享资源访问的问题。与单体应用不同的是&am…

软件需求最佳实践笔记(一)

1.软件需求最佳实践笔记 | 需求框架 前言&#xff1a;SERU是一套系统全面的需求方法论&#xff0c;可指导我们日常的软件需求工作。曾参加过徐峰老师软件需求最佳实践课程的培训&#xff0c;收益颇多&#xff0c;现通过笔记形式整理出来&#xff0c;以期与具有同样需求的读者共…

声音信号基音提取算法基频和谐波处理分析

1、内容简介 略 293-可以交流、咨询、答疑 2、内容说明 略 一、 实验原理&#xff1a; 傅里叶变换建立了信号频谱的概念。所谓傅里叶分析即分析信号的频谱(频率构成)、频带宽度等。要想合成出一段音乐,就要了解该段音乐的基波频率、谐波构成等。因此,必须采用傅里叶变换这…

软件工程—需求分析阶段

第一步、需求获取 为了保证能全面地获取信息&#xff0c;以更好地服务于产品设计和迭代&#xff0c;产品经理必须利用内部外部等多种渠道来获取用户需求。并且因渠道差异&#xff0c;产品经理所采取的方式与方法也相应会有所差异&#xff0c;所以产品经理还必须根据不同的渠道…

作业1.1利用Audacity软件分析音频

文章目录 前言实验内容实验步骤实验结果结果分析总结 前言 Audacity软件分析其余格式的音频时需要安装FFmpeg库&#xff0c;所以我们下载一个格式转换软件将音频转为MP3格式进行处理。语音信号具有短时平稳性&#xff0c;即在一个短时间范围内&#xff08;10-30ms&#xff09;…

C++ OBS源码分析与屏幕录制软件开发视频教程

本课程主要讲解OBS源码的编译&#xff0c;OBS功能实现&#xff0c;初始化&#xff0c;显示器录制&#xff0c;窗口的实现录制&#xff0c;以及录制模块源码详细分析&#xff0c;最后基于OBS源码开发了一个录制软件&#xff0c;界面如下&#xff1a; 主要有如下功能 &#xff0…

酒店管理系统-需求分析报告

目录 1.引言 1.1编制的目的 1.2术语定义 1.3参考资料 1.4相关文档 2.概述 2.1项目的描述 2.2项目的功能 2.3用户特点 3.具体需求 3.1业务需求 3.1.1主要业务 3.1.2未来增长预测 3.2用户需求 3.3应用需求 3.3.1系统功能 3.3.2主要应用及使用方式 3.4网络基本结构…

基于matlab的声波分析研究,基于MATLAB的声音信号分析与处理(共13页)

设计了一套信号采集与处理系统&#xff0c;建立了傅立叶变换算法模型&#xff0c;可获得其频谱图进行频谱分析&#xff0c;建立滤波器的设计算法模型设计了一个声音滤波器&#xff0c;建立滤波算法模型可对声音信号进行滤波。本套系统的算法建立都是基于MATLAB软件&#xff0c;…

分析评估和定位声音质量

/** * author wangdaopo * email 3168270295qq.com */ 影响音频质量和稳定性的因素 音质好坏的评价&#xff0c;响度、音高、音色&#xff0c; 测试&#xff0c;你的语音引擎是基本可用的&#xff0c;客观评测软件是RMAA&#xff08;RightMark Audio Analyzer&#xff1b;比…

声学计算机软件,常用声学仿真软件汇总

声学仿真软件根据计算原理不同大致分为以下几类&#xff1a; 一、电力声类比法 将振动系统和声学系统转化为等效电路&#xff0c;是一种0维的参数化建模方法&#xff1b; 优点&#xff1a;计算速度快&#xff1b; 缺点&#xff1a;无法预测高频响应以及复杂声波叠加&#xff1b…

荔枝软件如何测试声音,荔枝如何测自己的声音 荔枝测自己的声音方法

您可能感兴趣的话题&#xff1a; 荔枝 测自己的声音 核心提示&#xff1a;  荔枝APP有一个特色功能——声鉴卡&#xff0c;声鉴卡可以用来测试用户的音色&#xff0c;比如女神音、御姐音、少年音等等&#xff0c;很多人都想用声鉴卡测试一下自己的音色&#xff0c;却不知道荔枝…

电脑版频谱测试软件,电脑实时声音频谱PC Sound Spectrum

PC Sound Spectrum是完全免费提供给广大用户们使用的一个电脑声音实时频谱显示软件&#xff0c;可以将电脑声卡里所有的声音捕捉并转换为24段动态频谱显示&#xff0c;也可透明显示&#xff0c;拥有鼠标穿透效果&#xff0c;而且不用安装&#xff0c;没有插件&#xff0c;体积小…

软件产品案例分析

软件产品案例分析 第一部分&#xff1a; 评测&#xff1a; 上手体验&#xff1a; 说实话&#xff0c;在老师布置这个作业之前我确实不知道有K米这个APP&#xff0c;我想这是很少去KTV的原因吧。。。不过在接到这个作业后&#xff0c;我就去百度了普及了一下这个app的相关知识。…

基于AI的恶意软件分析技术(3)

一篇综述&#xff1a;用于检测和分类恶意软件的机器学习的兴起&#xff1a;研究的发展、趋势和挑战 阅读The rise of machine learning for detection and classification of malware: Research developments, trends and challenges翻译&笔记 原文&#xff1a;https://w…

吉林大学软件需求分析 Software Requirement Analysis

文章目录 吉林大学软件需求分析 Software Requirement Analysis缩写/术语Chapter 1 Introduction1 Software and Engineering1.1Software1.2软件工程1.3需求对软件项目的影响 2 Software Requirements2.1问题域2.2需求 3 Requirements Engineering3.1需求工程的历史3.2需求工程…

splunk 日志分析软件 简介

目录 Splunk总体介绍 简介 Splunk是什么 Splunk做什么 Splunk如何做 应用场景 日志管理 为机器数据建立索引 搜索、关联、调查 钻取分析 监控&告警 报表和仪表盘 IT运维监控 IT运维监控视图 丰富的App和插件 安全和欺诈 安全神经中心 安全挑战 高级…