ReentrantLock是一个互斥锁,它具有synchronized相同的能力;但相比之下,ReentrantLock扩展性更强,比如实现了公平锁。
下面详细拆解下ReentrantLock的公平锁和非公平锁的实现。
JDK版本:1.8.0_40
公平锁
先看ReentrantLock两个构造方法:
// 默认非公平
public ReentrantLock() {sync = new NonfairSync();
}// 根据传参来实现公平或非公平锁
public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();
}
因此,要想实现公平锁,只需要传参为true即可。
先来看一个测试类,后面都将围绕该测试类来分享:
public class TestReentrantLock {private static ReentrantLock lock = new ReentrantLock(true);public static void main(String[] args) {Runnable runnable = () -> {process();};Thread t0 = new Thread(runnable);Thread t1 = new Thread(runnable);Thread t2 = new Thread(runnable);t0.start();t1.start();t2.start();}public static void process() {String name = Thread.currentThread().getName();System.out.println(name +" 尝试获取锁");lock.lock();try {System.out.println(name +" 已获取锁");sleep(1000L);} finally {System.out.println(name +" 释放锁");lock.unlock();}}private static void sleep(Long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {// ignore}}
}
输出:
Thread-0 尝试获取锁
Thread-2 尝试获取锁
Thread-1 尝试获取锁
Thread-0 已获取锁
Thread-0 释放锁
Thread-2 已获取锁
Thread-2 释放锁
Thread-1 已获取锁
Thread-1 释放锁
根据这个测试类,通过画图的方式,一起来看看每一步都执行了什么。
lock()过程
先来看看lock()过程
1)首先创建一个公平锁,ReentrantLock lock = new ReentrantLock(true);
2)t0尝试获取锁,lock.lock(),该方法会调用父类的acquire方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
tryAcquire(arg)方法:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 1.判断state是否为0,因为t0是第一个进来的,此时state为0if (c == 0) {// 2.hasQueuedPredecessors() 队列是否有内容,此时没有内容// 3.compareAndSetState(0, acquires) 尝试将state改成1,我们假设t0线程修改成功if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {// 4.将成员变量exclusiveOwnerThread修改为当前线程setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
由于该方法返回了true,!tryAcquire(arg)为false,acquire(int arg)方法直接返回了。
t0拿到了锁,开始处理业务。此时该对象锁如图:(绿色表明与上一个图的变化,下面也是一样)
3)我们假设t0还未释放锁,接着由t1尝试获取锁,lock.lock(),再来看acquire方法
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
还是首先看tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 1.判断state是否为0,因为t0已经将该值改成了1if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// exclusiveOwnerThread 也被改成了t0else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
因未释放锁,因此该方法会返回false。!tryAcquire(arg)为true,接着会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法。
先看看addWaiter(Node.EXCLUSIVE)做了什么:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;// 此时tail为null,不会执行这个方法if (pred != null) {// 如果t1已经入队,t2则直接加入到队尾,tail指向t2node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}// 入队enq(node);return node;
}private Node enq(final Node node) {for (;;) {// 记录尾部nodeNode t = tail;if (t == null) { // Must initialize// 第1次循环,tail为null,给head和tail赋值一个空的Nodeif (compareAndSetHead(new Node()))tail = head;} else {// 非第1次循环,将传入的node的prev指向刚新建的nodenode.prev = t;// 将传入的node赋值给tail,这里会存在竞争,如果有多个同时修改,那么未修改成功的将进入下一次循环if (compareAndSetTail(t, node)) {// 将之前记录的尾部的next指向传入的node,也就是指向尾部t.next = node;return t;}}}
}
入队,第1次循环:
第2次循环,将t1入队:
如果在第2次循环的时候,存在竞争,则未竞争成功的会继续第3次循环,最终都将入队:
入队成功之后,我们看下acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// 获取当前node的prev节点final Node p = node.predecessor();// 如上图所示,如果当前节点是t1的话,会再次尝试获取锁。if (p == head && tryAcquire(arg)) {// 能获取到锁,表明t0已经释放了锁,此时将head指向当前nodesetHead(node);// prev节点next指向置为空,方便回收p.next = null; // help GCfailed = false;return interrupted;}// 如果t0未释放,或prev不是head// shouldParkAfterFailedAcquire(p, node) 该方法是给每个节点的prev节点的waitStatus设置为Node.SIGNAL,即-1// parkAndCheckInterrupt() 暂停线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
该方法每个节点的prev节点的waitStatus设置为Node.SIGNAL,即-1,并且使线程暂停,如图:
到此,lock()到此结束,除t0正在执行外,其他都在等待。
unlock()过程
再来看看unlock过程
1)t0业务逻辑执行完毕,开始释放锁
public final boolean release(int arg) {// 尝试释放锁if (tryRelease(arg)) {Node h = head;// 判断head是否不为0,在lock的时候,waitStatus都已改成-1if (h != null && h.waitStatus != 0)// 唤醒线程unparkSuccessor(h);return true;}return false;
}protected final boolean tryRelease(int releases) {// 直接把state减1int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 不为0,则该线程之前存在重入锁情况if (c == 0) {// 为0,则当前线程占用的锁全部释放free = true;// 将锁的占有线程置为nullsetExclusiveOwnerThread(null);}// 设置新的state值,因为此时不会有竞争,可直接设置setState(c);return free;
}private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)// 将head waitStatus改成0compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {// head的next的node节点waitStatus大于0,即已取消,// 则从tail开始循环获取到队列头部第一个waitStatus<=0的值s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)// 唤醒线程,如上图,唤醒的是t1线程LockSupport.unpark(s.thread);
}
2)t1被唤醒之后,我们再来看acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {// t1的prev是headfinal Node p = node.predecessor();if (p == head && tryAcquire(arg)) {// 因t0已释放锁,t1获取到锁,exclusiveOwnerThread=t1, state=1// 将head指向为当前node,即t1所在node,并把该node的prev、thread都置为nullsetHead(node);// prev节点next指向置为空,方便回收p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())// 如果是中断唤醒的,interrupted设置为trueinterrupted = true;}} finally {if (failed)cancelAcquire(node);}
}
t1拿到了锁,exclusiveOwnerThread值为t1, 之前创建的空Node对象没有任何指向,将会被回收掉。
如图:
3)如果t1是被其他线程中断的,parkAndCheckInterrupt()会返回true,interrupted则会设置为true。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);// 该方法会返回是否被中断,以及清理中断状态return Thread.interrupted();
}
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))// 被中断后,会执行该方法;// 因为线程的中断状态已被清理,此处会再次执行中断selfInterrupt();
}
因Thread.interrupted()方法会清理中断状态,执行selfInterrupt()方法再次中断下,这样业务逻辑执行时拿到的线程状态就是中断的。
总结下加锁和解锁过程:
1.新加入的线程会尝试获取锁(tryAcquire);
2.获取不到,则入队(addWaiter);
3.入队之后,队头会再次尝试获取锁,获取不到,则和其他线程一样暂停(parkAndCheckInterrupt);
4.有线程释放锁,唤醒队头线程(unparkSuccessor);
5.队头线程尝试获取锁,退出自旋(acquireQueued),开始执行业务逻辑。
非公平锁
非公平锁和公平锁的区别:主要在于获取锁的时候要不要先判断是否已有线程在等待。非公平锁,有线程进来,直接尝试获取一次锁。
我们看下非公平锁获取锁的过程:
final void lock() {// 先尝试获取锁if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);
}public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();
}
tryAcquire方法会调nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 同样也是先尝试获取锁if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
每次都是先尝试compareAndSetState(0, acquires)获取锁。
再来看下公平锁获取过程:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 先判断队列是否有线程在等待if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
}
公平锁,会判断队列是否有线程在等待。
以上是ReentrantLock详解,有问题欢迎沟通。