什么是生产者消费者模式
简单来说,生产者消费者模式就是缓冲区。
那么这么做有两个好处,一个是解耦,第二个是平衡生产能力和消费能力的差,因为生产者和消费者的速度是不一样的,有了这个缓冲区就可以平衡这样一个落差,达到动态平衡。
那么这个缓冲区其实就是一个队列,它的规则就是当队列是满的时候,生产者会被阻塞。当队列为空的时候,消费者会被阻塞, 在java中实现生产者消费者模式有多种方式,主要是线程间的通信,这里介绍三种:wait/notify、await/signal和阻塞队列。
wait/notify
wait的时候阻塞当前线程,然后释放锁,这时候消费者就有机会抢占到锁了,代码如下:
//waitnotify模式的生产者消费者,泛型类
public class WaitNotifyProducerConsumer<T> {//储存元素的队列public ArrayList<T> list = new ArrayList<>();//队列最大长度5private int maxSize = 5;//生产者public void put(T item) throws InterruptedException {//锁住synchronized (list) {//如果已经达到5了,证明满了,阻塞线程,释放锁if (list.size() == maxSize) {list.wait();}//添加队列元素list.add(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");//通知消费者list.notify();}}public T take() throws InterruptedException {//锁住,和生产者共用一把锁synchronized (list) {//如果队列为空,阻塞线程释放锁,等待生产者放进去了唤醒if (list.isEmpty()) {list.wait();}//队列,先进先出,取出第一个元素T item = list.remove(0);System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");//通知生产者list.notify();return item;}}public static void main(String[] args) throws InterruptedException {WaitNotifyProducerConsumer<String> wnpc = new WaitNotifyProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞wnpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者先启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = wnpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}
}
执行一下,生产者和消费者轮流生产消费元素。
await/signal
await和signal是Condition里面的,作用和synchronized一样,await让线程阻塞,并且释放锁,signal唤醒阻塞的线程,代码如下:
//Condition实现生产者消费者模式,泛型类
public class ConditionProducerConsumer<T> {//锁private Lock lock = new ReentrantLock();//储存元素的队列public ArrayList<T> list = new ArrayList<>();//队列最大长度5private int maxSize = 5;//使用两个条件队列condition来实现精确通知,生产者conditionprivate final Condition producerCondition = lock.newCondition();//消费者conditionprivate final Condition consumerCondition = lock.newCondition();//生产者public void put(T item) throws InterruptedException {//锁住lock.lock();try {//如果队列已满,阻塞当前线程,添加到生产者Condition等待队列,释放锁if (list.size() == maxSize) {producerCondition.await();}//添加元素list.add(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + list.size() + "个元素");//唤醒消费者Condition队列阻塞的线程consumerCondition.signal();} finally {lock.unlock();}}public T take() throws InterruptedException {//加锁lock.lock();try {//如果队列为空,释放锁,阻塞线程,添加到消费者等待队列if (list.isEmpty()) {consumerCondition.await();}//取出第一个元素T item = list.remove(0);System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + list.size() + "个元素");//唤醒生产者队列阻塞的线程producerCondition.signal();return item;} finally {lock.unlock();}}public static void main(String[] args) throws InterruptedException {ConditionProducerConsumer<String> cpc = new ConditionProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞cpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者先启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = cpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}}
运行一下:
阻塞队列
JUC中的阻塞队列
- ArrayBlockingQueue 基于数组结构
- LinkedBlockingQueue 基于链表结构
- PriorityBlcokingQueue 基于优先级队列
- DelayQueue 允许延时执行的队列
- SynchronousQueue 没有任何存储结构的的队列,它的应用场景是newCachedThreadPool()线程池。可以处理非常大请求的任务,1000个任务过来,那么线程池需要分配1000个线程来执行。关于线程池的原理可以查看《线程池源码精讲》 。
阻塞队列中的方法
添加元素
- add -> 如果队列满了,抛出异常
- offer -> true/false , 添加成功返回true,否则返回false
- put -> 如果队列满了,则一直阻塞
- offer(timeout) -> 带了一个超时时间。如果添加一个元素,队列满了,此时会阻塞timeout时长,超过阻塞时长,返回false。
移除元素
- element-> 队列为空,抛异常
- peek -> true/false , 移除成功返回true,否则返回false
- take -> 一直阻塞
- poll(timeout) -> 如果超时了,还没有元素,则返回null
实现生产者消费者
上面用Condition中的await和signal其实是就是在手写阻塞队列,但是我们可以不用这么麻烦,直接用java中封装好的阻塞队列,这些阻塞队列的源码点进去看其实用的就是await和signal,只不过封装好的代码比我们自己写的代码要健壮。我们这里用的是阻塞队列提供的put()和take()方法,代码如下:
public class BlockingQueueProducerConsumer<T> {//阻塞队列,长度是5BlockingQueue<T> blockingQueue = new ArrayBlockingQueue(5);//生产者,调阻塞队列的put方法public void put(T item) throws InterruptedException {blockingQueue.put(item);System.out.println(Thread.currentThread().getName() + "生产元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");}//消费者,调阻塞队列的take方法public T take() throws InterruptedException {T item = blockingQueue.take();System.out.println(Thread.currentThread().getName() + "消费元素" + item + ",队列里还剩" + blockingQueue.size() + "个元素");return item;}public static void main(String[] args) throws InterruptedException {BlockingQueueProducerConsumer<String> bqpc = new BlockingQueueProducerConsumer<>();//生产者线程,十轮Thread t1 = new Thread(() -> {Random random = new Random();for (int i = 1; i <= 10; i++) {//元素String item = "item-" + i;try {//如果队列满了,put会阻塞bqpc.put(item);//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t1.start();//让生产者启动生产数据Thread.sleep(1000);Thread t2 = new Thread(() -> {Random random = new Random();//自旋for (; ; ) {try {//消费元素String item = bqpc.take();//随机休息Thread.sleep(random.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}});t2.start();}
}
运行一下:
我们可以看到用java中自带的阻塞队列实现生产者消费者是最简单的,两行代码搞定,但是另外两种写法也得懂,主要是线程间的通信,毕竟要知其然知其所以然,最后感谢大家收看~