目录
一、concurrent并发包
二、ReentrantLock(可重入锁)
1、锁状态中断与可重入
2、尝试非阻塞地获取锁
3、等待可中断
4、设置公平锁
三、CountDownLatch(门栓)
四、cyclicBarrier(栅栏)
五、phaser(可以控制的栅栏)
六、ReadWriteLock(读写锁)
七、Semphore(指示灯)
八、Exchanger(交换)
九、LockSupport(可重入锁)
一、concurrent并发包
在JDK1.5之前,Java中要进行业务并发时,不管什么情况下,都需要由程序员独立完成代码实现,此时在进行一些并发设计时需要考虑性能、死锁、公平性、资源管理以及如何避免线程安全性方面带来的危害等,往往会采用一些较为复杂的安全策略,加重了程序员的开发负担。.在JDK1.5出现之后,java.util.concurrent工具包作为简化并发出现。concurrent提供了多种并发模型使我们在相应的情况下大大减少了开发的负担。下面对其中常见实现类进行讲解。
二、ReentrantLock(可重入锁)
ReentrantLock意思是“可重入锁”。ReentrantLock是唯一实现了Lock接口的类,所以Lock接口的一些重要特性ReentrantLock都会拥有。底层采用的是cas实现的,其实大多数的锁底层都是cas实现的。下图为基本特性以及常用方法:
特性 | 描述 |
---|---|
尝试非阻塞地获取锁 | 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁 |
锁状态中断 | 获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放 |
等待可中断 | 是指当持有锁的线程长期不释放锁的时候,正在等待的线程可以选择放弃等待,改为处理其他事情。可中断特性对处理执行时间非常长的同步块很有帮助 |
设置公平锁 | 默认非公平锁。但是可以进行设置 |
1、锁状态中断与可重入
ReentrantLock作为可重入锁,首先要具备可重入性,同时作为Lock的实现类,和Lock一样必须主动去释放锁,并且在发生异常时,不会自动释放锁。因此一般来说,使用Lock必须在try{}catch{}块中进行,并且将释放锁的操作放在finally块中进行,以保证锁一定被被释放,防止死锁的发生。下面通过代码对比Reentrantlock用于替代synchronized时的区别,以及观察能自动中断锁这一特性:
/*** Reentrantlock用于替代synchronized* 本例中由于m1锁定this,只有m1执行完毕的时候,m2才能执行* 这里是复习synchronized最原始的语义* 可以看到synchronized的可重入性*/
package com.zcm.juc;import java.util.concurrent.TimeUnit;public class T01_ReentrantLock1 {synchronized void m1() {for(int i=0; i<10; i++) {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(i);if(i == 2) m2();}}synchronized void m2() {System.out.println("m2 start");}public static void main(String[] args) {T01_ReentrantLock1 rl = new T01_ReentrantLock1();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}
/*** 使用reentrantlock可以完成同样的功能* 需要注意的是,必须要必须要必须要手动释放锁(重要的事情说三遍)* 使用syn锁定的话如果遇到异常,jvm会自动释放锁,但是lock必须手动释放锁,因此经常在finally中进行锁的释放*/
package com.zcm.juc;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class T02_ReentrantLock2 {Lock lock = new ReentrantLock();void m1() {try {lock.lock(); // 相当于synchronized(this)for (int i = 0; i < 10; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}// 如果需要中断,直接抛出错误,之后走到finally中,就释放锁了} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}void m2() {try {lock.lock();System.out.println("m2 start");} finally {lock.unlock();}}public static void main(String[] args) {T02_ReentrantLock2 rl = new T02_ReentrantLock2();new Thread(rl::m1).start();try {// 保证m1先执行TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}
2、尝试非阻塞地获取锁
可以通过使用tryLock方法来进行尝试获取锁,因为在使用tryLock的时候,不管使用锁定成功,代码都会继续执行,所以在使用tryLock的时候可以通过方法返回值,或者设置tryLock的时间来判断是否锁定成功。
/*** 使用reentrantlock可以进行“尝试锁定”tryLock,这样无法锁定,或者在指定时间内无法锁定,线程可以决定是否继续等待*/
package com.zcm.juc;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class T03_ReentrantLock3 {Lock lock = new ReentrantLock();void m1() {try {lock.lock();for (int i = 0; i < 3; i++) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}/*** 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行* 可以根据tryLock的返回值来判定是否锁定* 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中*/void m2() { /*boolean locked = lock.tryLock();System.out.println("m2是否获取锁" + locked);if(locked) lock.unlock();*/boolean locked = false;try {// 如果在等待时间内无法获取锁,进行进入catchlocked = lock.tryLock(5, TimeUnit.SECONDS);System.out.println("m2是否获取锁" + locked);} catch (InterruptedException e) {e.printStackTrace();} finally {if(locked) lock.unlock();}}public static void main(String[] args) {T03_ReentrantLock3 rl = new T03_ReentrantLock3();new Thread(rl::m1).start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}new Thread(rl::m2).start();}
}
3、等待可中断
当持有锁的线程长期不释放锁的时候,正在等待的线程如果一直等待非常浪费资源,而ReentrantLock可以选择放弃等待,改为处理其他事情。可中断特性对处理执行时间非常长的同步块很有帮助。其实主要就是对interrupt和lockInterruptibly方法的应用
/*** 使用ReentrantLock还可以调用lockInterruptibly方法,可以对线程interrupt方法做出响应,* 在一个线程等待锁的过程中,可以被打断*/
package com.zcm.juc;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;public class T04_ReentrantLock4 {public static void main(String[] args) {Lock lock = new ReentrantLock();Thread t1 = new Thread(()->{try {lock.lock();System.out.println("t1 start");// 模仿长时间不释放锁的情况TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);System.out.println("t1 end");} catch (InterruptedException e) {System.out.println("interrupted!");} finally {lock.unlock();}});t1.start();Thread t2 = new Thread(()->{try {//相当于lock.lock();//可以对interrupt()方法做出响应lock.lockInterruptibly(); System.out.println("t2 start");TimeUnit.SECONDS.sleep(5);System.out.println("t2 end");} catch (InterruptedException e) {System.out.println("interrupted!");} finally {lock.unlock();}});t2.start();try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}//打断线程2的等待t2.interrupt(); }
}
4、设置公平锁
公平锁:是指多个线程在等待同一个锁时,必须按照申请锁的时间顺序来依次获得锁;而非公平锁则不保证这一点,在锁被释放时,任何一个等待锁的线程都有机会获得锁。synchronized中的锁时非公平的,ReentrantLock在默认情况下也是非公平的,但可以通过带布尔值的构造函数要求使用公平锁不过一旦使用了公平锁,将会导致ReentrantLock的性能急剧下降,会明显影响吞吐量。
/*** ReentrantLock还可以指定为公平锁*/
package com.zcm.juc;import java.util.concurrent.locks.ReentrantLock;public class T05_ReentrantLock5 extends Thread {//参数为true表示为公平锁,请对比输出结果,公平锁也不是一定交叉执行//多次尝试//想一下为啥private static ReentrantLock lock=new ReentrantLock(true);public void run() {for(int i=0; i<100; i++) {lock.lock();try{System.out.println(Thread.currentThread().getName()+"获得锁");}finally{lock.unlock();}}}public static void main(String[] args) {T05_ReentrantLock5 rl=new T05_ReentrantLock5();Thread th1=new Thread(rl);Thread th2=new Thread(rl);th1.start();th2.start();}
}
三、CountDownLatch(门栓)
CountDownLatch虽然是一个同步工具,但是CountDownLatch不是锁,主要的作用使一个线程等待其他线程各自执行完毕后再执行,是通过一个计数器来实现的,CountDownLatch在初始化时,需要给定一个整数作为计数器。当调用countDown方法时,计数器会被减1;当调用await方法时,如果计数器大于0时,线程会被阻塞,一直到计数器被countDown方法减到0时,线程才会继续执行。计数器是无法重置的,当计数器被减到0时,调用await方法都会直接返回。
我们要明白重要的一点执行countDown方法的线程不会进行阻塞,执行awit方法的线程才会阻塞。同时也可以设置等待过期时间,等待时间过后开始执行。使用场景和join差不多。
package com.zcm.juc;import java.util.concurrent.CountDownLatch;public class T06_TestCountDownLatch {public static void main(String[] args) {// 对比join和countDownLatchusingJoin();usingCountDownLatch();}/*** @Author zhangchunming* @Description //TODO 使用countDownLatch进行等待**/private static void usingCountDownLatch() {Thread[] threads = new Thread[100];CountDownLatch latch = new CountDownLatch(threads.length);for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;for(int j=0; j<10000; j++) result += j;// 计数器减一latch.countDown();});}for (int i = 0; i < threads.length; i++) {threads[i].start();}try {// 阻塞的是当前线程,所以2无法执行下面的输出// 可以加时间 latch.await(3, TimeUnit.SECONDS);latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("end latch");}private static void usingJoin() {Thread[] threads = new Thread[100];for(int i=0; i<threads.length; i++) {threads[i] = new Thread(()->{int result = 0;for(int j=0; j<10000; j++) result += j;});}for (int i = 0; i < threads.length; i++) {threads[i].start();}for (int i = 0; i < threads.length; i++) {try {threads[i].join();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println("end join");}
}
四、cyclicBarrier(栅栏)
其实CyclicBarrier和CountDownLatch非常容易混淆,CountDownLatch他是阻塞一个线程,等待其他线程执行完阻塞线程在执行,而CyclicBarrier是阻塞一个线程组,在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await方法将自己阻塞,此时计数器会减1,当计数器减为0的时候所有因调用await方法而被阻塞的线程将被唤醒。
package com.zcm.juc;import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;public class T07_TestCyclicBarrier {public static void main(String[] args) {//三种写法//CyclicBarrier barrier = new CyclicBarrier(20);CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("满人"));/*CyclicBarrier barrier = new CyclicBarrier(20, new Runnable() {@Overridepublic void run() {System.out.println("满人,发车");}});*/for(int i=0; i<100; i++) {new Thread(()->{try {// 发现每次满20个线程才会执行,而且可以重复使用barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}}
}
五、phaser(可以控制的栅栏)
Phaser是一个灵活的线程同步工具,他包含了CyclicBarrier和CountDownLatch的相关功能。
Phaser替代CountDownLatch。对于CountDownLatch而言,有2个重要的方法,一个是await()方法,可以使线程进入等待状态,在Phaser中,与之对应的方法是awaitAdvance(int n)。CountDownLatch中另一个重要的方法是countDown(),使计数器减一,当计数器为0时所有等待的线程开始执行,在Phaser中,与之对应的方法是arrive()。
同时Phaser也可以理解为一个可以控制的栅栏,CyclicBarrier的await()方法可以直接用Phaser的arriveAndAwaitAdvance()方法替代。
package com.zcm.juc;import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
// 代码演示替换cyclicBarrier
public class T08_TestPhaser {static Random r = new Random();static MarriagePhaser phaser = new MarriagePhaser();//每次写这个太麻烦,直接提取出来static void milliSleep(int milli) {try {TimeUnit.MILLISECONDS.sleep(milli);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {phaser.bulkRegister(5);for(int i=0; i<5; i++) {final int nameIndex = i;new Thread(()->{Person p = new Person("person " + nameIndex);p.arrive();// 相当于awitphaser.arriveAndAwaitAdvance();p.eat();phaser.arriveAndAwaitAdvance();p.leave();phaser.arriveAndAwaitAdvance();}).start();}}static class MarriagePhaser extends Phaser {/** boolean onAdvance(int phase, int registeredParties)方法。此方法有2个作用:* 1、当每一个阶段执行完毕,此方法会被自动调用,因此,重载此方法写入的代码会在每个阶段执行完毕时执行,相当于CyclicBarrier的barrierAction。* 2、当此方法返回true时,意味着Phaser被终止,因此可以巧妙的设置此方法的返回值来终止所有线程。* phase表示执行阶段,registeredParties表示管理的线程**/@Overrideprotected boolean onAdvance(int phase, int registeredParties) {switch (phase) {case 0:System.out.println("所有人到齐了!");return false;case 1:System.out.println("所有人吃完了!");return false;case 2:System.out.println("所有人离开了!");System.out.println("婚礼结束!");return true;default:return true;}}}static class Person {String name;public Person(String name) {this.name = name;}public void arrive() {milliSleep(r.nextInt(1000));System.out.printf("%s 到达现场!\n", name);}public void eat() {milliSleep(r.nextInt(1000));System.out.printf("%s 吃完!\n", name);}public void leave() {milliSleep(r.nextInt(1000));System.out.printf("%s 离开!\n", name);}}
}
可以随时控制phaser 的大小,上边通过bulkRegister方法指定,下边可以看一下通过register注册新的phaser ,就是相当于+1,。
package com.zcm.juc;import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;public class T09_TestPhaser2 {static Random r = new Random();static MarriagePhaser phaser = new MarriagePhaser();static void milliSleep(int milli) {try {TimeUnit.MILLISECONDS.sleep(milli);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {phaser.bulkRegister(7);for(int i=0; i<5; i++) {new Thread(new Person("p" + i)).start();}new Thread(new Person("新郎")).start();new Thread(new Person("新娘")).start();}static class MarriagePhaser extends Phaser {@Overrideprotected boolean onAdvance(int phase, int registeredParties) {switch (phase) {case 0:System.out.println("所有人到齐了!" + registeredParties);System.out.println();return false;case 1:System.out.println("所有人吃完了!" + registeredParties);System.out.println();return false;case 2:System.out.println("所有人离开了!" + registeredParties);System.out.println();return false;case 3:System.out.println("婚礼结束!新郎新娘抱抱!" + registeredParties);return true;default:return true;}}}static class Person implements Runnable {String name;public Person(String name) {this.name = name;}public void arrive() {milliSleep(r.nextInt(1000));System.out.printf("%s 到达现场!\n", name);phaser.arriveAndAwaitAdvance();}public void eat() {milliSleep(r.nextInt(1000));System.out.printf("%s 吃完!\n", name);phaser.arriveAndAwaitAdvance();}public void leave() {milliSleep(r.nextInt(1000));System.out.printf("%s 离开!\n", name);phaser.arriveAndAwaitAdvance();}private void hug() {if(name.equals("新郎") || name.equals("新娘")) {milliSleep(r.nextInt(1000));System.out.printf("%s 洞房!\n", name);phaser.arriveAndAwaitAdvance();} else {//观察加一和减一的区别:发现加一的时候最后不会打印:婚礼结束,说明当前阶段没有完毕//phaser.arriveAndDeregister();phaser.register();}}//每个线程在启动的时候调用一下方法@Overridepublic void run() {arrive();eat();leave();hug();}}
}
六、ReadWriteLock(读写锁)
其实就是共享锁和排它锁,ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。
所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容。
读写锁比互斥锁允许对于共享数据更大程度的并发。每次只能有一个写线程,但是同时可以有多个线程并发地读数据。ReadWriteLock适用于读多写少的并发情况。
package com.zcm.juc;import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;public class T10_TestReadWriteLock {static Lock lock = new ReentrantLock();private static int value;static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();static Lock readLock = readWriteLock.readLock();static Lock writeLock = readWriteLock.writeLock();public static void read(Lock lock) {try {lock.lock();Thread.sleep(1000);System.out.println("read over!");//模拟读取操作} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void write(Lock lock, int v) {try {lock.lock();Thread.sleep(1000);value = v;System.out.println("write over!");//模拟写操作} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}public static void main(String[] args) {//Runnable readR = ()-> read(lock);Runnable readR = ()-> read(readLock);//Runnable writeR = ()->write(lock, new Random().nextInt());Runnable writeR = ()->write(writeLock, new Random().nextInt());//通过观察执行时间for(int i=0; i<18; i++) new Thread(readR).start();for(int i=0; i<2; i++) new Thread(writeR).start();}
}
七、Semphore(指示灯)
可以设置几个线程同时执行,设置俩个的话,就是等这俩个线程执行完之后,其他才能执行,可以实现限流,同时可以设置公平锁和非公平锁。
package com.zcm.juc;import java.util.concurrent.Semaphore;public class T11_TestSemaphore {public static void main(String[] args) {//Semaphore s = new Semaphore(2);//设置许可数和公平锁Semaphore s = new Semaphore(2, true);//允许一个线程同时执行//Semaphore s = new Semaphore(1);new Thread(()->{try {//从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。//就是上边的2-1s.acquire();System.out.println("T1 running...");Thread.sleep(200);System.out.println("T1 running...");System.out.println("查看可用许可数:"+s.availablePermits());System.out.println("查看是否有线程在等待许可:"+s.hasQueuedThreads());} catch (InterruptedException e) {e.printStackTrace();} finally {//释放一个许可,将其返回给信号量。//+1s.release();}}).start();new Thread(()->{try {s.acquire();System.out.println("T2 running...");Thread.sleep(200);System.out.println("T2 running...");s.release();} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}
八、Exchanger(交换)
Exchanger用于线程间进行通信、数据交换。Exchanger提供了一个同步点exchange方法,两个线程调用exchange方法时,无论调用时间先后,两个线程会互相等到线程到达exchange方法调用点,此时两个线程可以交换数据,将本线程产出数据传递给对方。
package com.zcm.juc;import java.util.concurrent.Exchanger;public class T12_TestExchanger {static Exchanger<String> exchanger = new Exchanger<>();public static void main(String[] args) {new Thread(()->{String s = "T1";try {//阻塞,等待交换,没在俩个以上用过s = exchanger.exchange(s);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " " + s);}, "t1").start();new Thread(()->{String s = "T2";try {s = exchanger.exchange(s);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " " + s);}, "t2").start();}
}
九、LockSupport(可重入锁)
里边存在各种方法,比如park()阻塞线程,unpark(线程)叫醒某个阻塞线程,而且unpark可以先于park使用,这个和wait不一样。
LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可,许可只有两个值1和0,默认是0,可以把许可看成是一种(0,1)信号量(Semaphore),但与Semaphore不同的是,许可的累加上限是1,LockSupport是一个线程阻塞工具类,所有的方法都是静态方法,可以在线程任意位置阻塞,阻塞之后有对应的唤醒方法.LockSupport内部调用Unsafe类的native方法。
同时面试经常问的俩个问题如下:
为什么可以先唤醒线程后阻塞线程? 因为unpark获得了一个许可,之后再调用park方法,就能直接使用许可消费,不会阻塞,跟唤醒和阻塞的顺序无关。
为什么唤醒两次后阻塞两次,最终结果还是会阻塞线程? 因为许可的数量最多为1,连续调用两次unpark和调用一次unpark的效果一样,许可的数量为1.而调用两次park需要消费两个许可,许可不够,不能放行,所以阻塞。
package com.zcm.juc;import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;public class T13_TestLockSupport {public static void main(String[] args) {Thread t = new Thread(()->{for (int i = 0; i < 10; i++) {System.out.println(i);if(i == 5) {//发现在i=5的时候线程阻塞,等待唤醒LockSupport.park();}try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});t.start();try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("after 8 senconds!");LockSupport.unpark(t);}
}