实现一把分布式锁通常有很多方法,比较常见的有 Redis 和 Zookeeper。
Redis分布式锁可参考之前的文章:
- Redisson 分布式锁原理分析:https://blog.csdn.net/qq_42402854/article/details/123342331
Zookeeper能实现分布式锁,是因为它有一个特性,就是多个线程去 Zookeeper 里面去创建同一个节点的时候,只会有一个线程执行成功。
锁可理解为 ZooKeeper 上的一个节点,
- 当需要获取锁时,就在这个锁节点下创建一个临时顺序节点。当存在多个客户端同时来获取锁,就按顺序依次创建多个临时顺序节点,但只有排列序号是第一的那个节点能获取锁成功,其他节点则按顺序分别监听前一个节点的变化,当被监听者释放锁时,监听者就可以马上获得锁。
- 当释放锁时,只需要将会话关闭,临时节点就删除了,即释放了锁。如果万一客户端获取到锁之后突然挂掉(Session连接断开),那么这个临时节点也会自动删除掉。
Zookeeper 的分布式锁实现: 实现有两种方案:
- 基于临时节点实现,会产生羊群效应,不推荐。
- 基于临时顺序节点实现。
一般我们认为,ZooKeeper 的分布式锁是基于临时顺序节点,然后通过监听机制来实现的
。即方案2。
一、基于临时节点实现
1、实现思路
锁节点:即锁对象
获取锁:锁请求者加锁,则创建锁节点。
- 如果创建成功,那么加锁成功,
- 如果创建失败,那么加锁失败,则等待获取锁(等待获取锁成功的客户端释放锁)。
注意:
这里锁请求者监听的都是 锁节点的删除操作。
释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。
羊群效应:如果所有的锁请求者都来监听锁持有者,当锁持有者的节点被删除以后,所有的锁请求者都会通知到,即都会同时去竞争锁,但是只有一个锁请求者能拿到锁。这就是羊群效应。
2、实现代码
一般不推荐使用。这里我们自己简单实现一下。使用的是 ZkClient客户端。
2.1 ZkClient客户端连接
public class ZkClientConnectUtils {private static String CONNECT_STR = "192.168.xxx.xxx:2181"; //集群模式下用逗号隔开/*** 使用匿名监听类* @throws Exception*/public static ZkClient zKClientConnnect() throws Exception {ZkClient zkClient = new ZkClient(CONNECT_STR, 3000,60000);TimeUnit.SECONDS.sleep(3);return zkClient;}}
2.2 锁实现
2.2.1 锁接口
/*** 自定义zk分布式锁:定义通用锁接口*/
public interface ZKLock {/*** 加锁*/void lock();/*** 释放锁*/void unlock();
}
2.2.2 抽象锁对象
/*** 抽象锁对象:用到了模板方法设计模式,具体抽象方法由子类实现*/
public abstract class AbstractZKLock implements ZKLock {protected static String path = "/lock";protected ZkClient zkClient = null;public AbstractZKLock() {initClient();}public void initClient(){try {zkClient = createClient();} catch (Exception e) {e.printStackTrace();System.out.println("初始化 zk客户端连接失败,errorMessage=" + e.getMessage());}}/*** 交给子类创建* @return*/protected abstract ZkClient createClient() throws Exception;/*** lock方法(模板方法设计模式):获取锁的方法* 1.如果锁获取成功,那么业务代码继续往下走。* 2.如果锁获取失败,lock方法需要等待重新获取锁* 2.1等待到了前面那个获取锁的客户端释放锁以后(zk监听机制),* 2.2然后再去重新获取锁*/@Overridepublic void lock() {// 尝试去获取锁if(tryLock()){System.out.println(Thread.currentThread().getName() + "--->获取锁成功!");}else {// 获取失败,在这里等待waitforlock();// 重新获取锁lock();}}@Overridepublic void unlock() {// 因为加锁创建的是临时节点,所以会话关闭,临时节点就删除了,即释放了锁zkClient.close();}/*** 获取锁* @return*/protected abstract boolean tryLock();/*** 获取失败,等待其他释放锁,重新获取锁*/protected abstract void waitforlock();
}
2.2.3 ZkClient客户端实现
/*** 基于ZkClient客户端实现锁:*/
public class ZkClientLock extends AbstractZKLock {private CountDownLatch cdl = null;@Overrideprotected ZkClient createClient() throws Exception{return ZkClientConnectUtils.zKClientConnnect();}/*** 尝试获取锁* @return*/@Overrideprotected boolean tryLock() {try {// 加锁:创建临时节点,创建成功表示加锁成功,否则加锁失败。zookeeper 的特性,节点名不能重复,否则创建失败。if(zkClient.exists(path)) {zkClient.delete(path);}//创建临时节点zkClient.createEphemeral(path);return true;} catch (RuntimeException e) {return false;}}/*** 等待获取锁:* 等前面那个获取锁成功的客户端释放锁* 没有获取到锁的客户端都会走到这里* 1、没有获取到锁的要注册对 path节点的watcher* 2、这个方法需要等待*/@Overrideprotected void waitforlock() {IZkDataListener iZkDataListener = new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}// 一旦 path节点被删除(释放锁)以后,就会触发这个方法@Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 让等待的代码不再等待了// 即 waitforlock方法执行结束,重新去获取锁if (cdl != null) {cdl.countDown();}}};// 注册对 path节点的watcherzkClient.subscribeDataChanges(path, iZkDataListener);// 等待if (zkClient.exists(path)) {cdl = new CountDownLatch(1);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}}// 取消该客户端的订阅关系zkClient.unsubscribeDataChanges(path, iZkDataListener);}
}
2.3 业务代码
public class OrderService {private OrderNumGenereteUtils orderNumFactory = new OrderNumGenereteUtils();// ZkClient分布式锁private ZKLock lock = new ZkClientLock();/*** 创建订单号,模拟业务*/public void createOrderNum() {lock.lock();String orderNum = generateOrderNum();System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]");lock.unlock();}/*** 生成时间格式的订单编号* @return*/private static int orderNum = 0;public String generateOrderNum() {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");return simpleDateFormat.format(new Date()) + ++orderNum;}}
2.4 测试
public class ZkClientLockTest {private static Integer count = 50;private static CountDownLatch cdl = new CountDownLatch(count);public static void main(String[] args) throws InterruptedException {for (int i = 0; i < count; i++) {new Thread(new Runnable() {@Overridepublic void run() {// 模拟50个并发同时去创建订单号。OrderService orderService = new OrderService();try {//线程运行起来时先等待。cdl.await();} catch (InterruptedException e) {e.printStackTrace();}orderService.createOrderNum();}}).start();cdl.countDown();}TimeUnit.MINUTES.sleep(3);}
}
并发模拟 50个线程同时去创建订单号。运行ok。
二、基于临时顺序节点实现
1、实现思路
锁节点:即锁对象
获取锁:锁请求者加锁,则创建锁节点。
- 如果创建成功,那么加锁成功,
- 如果创建失败,那么加锁失败,则等待它的前一个节点释放锁。
注意:
这里锁请求者监听的是它上一个节点的删除操作。
释放锁:只需要将会话关闭,临时节点就删除了,即释放了锁。
2、示例代码
在实际的开发中,建议直接使用 Curator客户端中的各种官方实现的分布式锁。我们没必要“重复造轮子“”。
Curator客户端提供的 几种锁方案:
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
下面我们以 InterProcessMutex为例。
2.1 Curator客户端连接
前面文章有介绍过 Curator客户端的使用。
public class CuratorClientConnectUtils {private static String CONNECT_STR = "192.168.198.110:2181"; //集群模式下用逗号隔开private static class InnerClass{private static CuratorFramework client = clientConnect2();}public static CuratorFramework getInstance(){return InnerClass.client;}public static void main(String[] args){//CuratorFramework client = clientConnect1();CuratorFramework client = clientConnect2();//启动客户端,必须要有client.start();System.out.println("==CuratorFramework==" + client);}/*** 使用工厂类CuratorFrameworkFactory的静态newClient()方法。* @throws Exception*/public static CuratorFramework clientConnect1() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);//创建客户端实例CuratorFramework client = CuratorFrameworkFactory.newClient(CONNECT_STR, retryPolicy);return client;}/*** 使用工厂类CuratorFrameworkFactory的静态builder构造者方法。* @throws Exception*/public static CuratorFramework clientConnect2() {// 重试策略, 失败重连3次RetryPolicy retryPolicy = new ExponentialBackoffRetry(100, 3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString(CONNECT_STR).sessionTimeoutMs(3000) // 会话超时时间.connectionTimeoutMs(50000) // 连接超时时间.retryPolicy(retryPolicy).namespace("Curator_Workspace") // 指定隔离名称,表示所有节点的操作都会在该工作空间下进行。不指定时,使用自定义的节点path.build();return client;}public static void closeZKClient() {if (InnerClass.client != null) {InnerClass.client.close();}}
}
2.2 业务代码
public class OrderService {/*** 分布式可重入排它锁*/public InterProcessMutex interProcessMutex;// 简单点,构造方法注入锁实例public OrderService(InterProcessMutex interProcessMutex) {this.interProcessMutex = interProcessMutex;}/*** 创建订单号,模拟业务*/public void createOrderNum() throws Exception {//加锁interProcessMutex.acquire();String orderNum = generateOrderNum();System.out.println(Thread.currentThread().getName() + "创建了订单号:[" + orderNum + "]");//释放锁interProcessMutex.release();}/*** 生成时间格式的订单编号* @return*/private static int orderNum = 0;public String generateOrderNum() {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss-");return simpleDateFormat.format(new Date()) + ++orderNum;}
}
2.3 测试
public class InterProcessMutexTest {private static Integer count = 50;private static CountDownLatch cdl = new CountDownLatch(count);public static void main(String[] args) {CuratorFramework client = CuratorClientConnectUtils.getInstance();client.start();InterProcessMutex interProcessMutex = new InterProcessMutex(client, "/curator/lock");for (int i = 0; i < count; i++) {new Thread(new Runnable() {@Overridepublic void run() {// 模拟50个并发同时去创建订单号。OrderService orderService = new OrderService(interProcessMutex);try {//线程运行起来时先等待。cdl.await();orderService.createOrderNum();} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}}).start();cdl.countDown();}}}
并发模拟 50个线程同时去创建订单号。运行ok。
注意:
我们可以通过锁的构造方法传入path。不传时底层path会有默认值。
下面简单看一下 Curator客户端实现分布式锁的源码。
3、加锁源码
3.1 查看acquire() -> internalLock()方法
3.2 查看 attemptLock()方法
上面 createsTheLock()方法和 internalLockLoop()方法是重点。底层都是 Curator客户端的API使用。
3.2.1 createsTheLock()方法:
级联创建临时有序节点,即获取锁逻辑。返回节点路径。
3.2.2 internalLockLoop()方法
这个方法实现了 监听的机制,自行查看。
注意:
usingWatcher是一次性监听。
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception{boolean haveTheLock = false;boolean doDelete = false;try{if ( revocable.get() != null ){client.getData().usingWatcher(revocableWatcher).forPath(ourPath);}while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ){List<String> children = getSortedChildren();String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slashPredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);if ( predicateResults.getsTheLock() ){haveTheLock = true;}else{String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();synchronized(this){try{// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leakclient.getData().usingWatcher(watcher).forPath(previousSequencePath);if ( millisToWait != null ){millisToWait -= (System.currentTimeMillis() - startMillis);startMillis = System.currentTimeMillis();if ( millisToWait <= 0 ){doDelete = true; // timed out - delete our nodebreak;}wait(millisToWait);}else{wait();}}catch ( KeeperException.NoNodeException e ){// it has been deleted (i.e. lock released). Try to acquire again}}}}}catch ( Exception e ){ThreadUtils.checkInterrupted(e);doDelete = true;throw e;}finally{if ( doDelete ){deleteOurPath(ourPath);}}return haveTheLock;}
4、解锁源码
4.1 查看 release()方法
4.2 查看releaseLock()方法
– 求知若饥,虚心若愚。