目录
啥是生产者消费者模型?
生产者消费者模型存在问题??如何进行解决呢??
生产者消费者模型导致的问题
什么是阻塞队列
生产者消费者模型优点
生产者消费者模型实现
Message
MessageQueue
获取消息get方法
生产消息take方法
测试生产者消费者模型
啥是生产者消费者模型?
生产者消费者模型是多线程的一个经典案例,在多个线程并发执行时,会有线程来生产数据这就是生产者,同时还有线程来消费数据这是消费者,生产者和消费者共同完成多线程并发任务.
生产者消费者模型存在问题??如何进行解决呢??
生产者消费者模型导致的问题
- 生产者和消费者在进行协作时必然会出现一些问题,比如生产者生产数据过快,而消费者消费数据过慢就导致生产者会处于阻塞等待,同理消费者消费数据过快,而生产者生产过慢,就会导致生产者会阻塞等待.
- 生产者和消费者在进行协作时,如果有一个出现异常,就会影响另外一个.
如何理解 "解耦合" ???
耦合性比较高 -- 两者关联性非常大的
耦合性比较低 -- 两者关联性较低 ---- 日常开发中我们要遵循 "高内聚","低耦合"
所谓高内聚 : 进行分类,让其更加几种管理,分模块
左图所示 :
两个服务器之间直接进行交互,这就是耦合性高的体现,如果服务器A出现异常(或者挂掉)就会导致服务器B挂掉,同理服务器B出现异常或者挂掉,对服务器A也有影响
有图所示:
两个服务器之间不直接进行交互,这是耦合性低的体现.一个服务器异常不会影响另外一个服务器.两个服务器依靠阻塞队列进行交互
如何理解"削峰减谷" 或者 如何理解"平衡生产者和消费者的处理能力" ???
对于左图:
如果大量客户端在同一时刻一起访问服务器A,这就造成服务器A崩掉,也会影响到服务器B也崩掉
对于有图:
如果大量客户端同一时刻访问服务器A,服务器A会将数据全部放到阻塞队列中,服务器B以正常的方式与阻塞队列进行交互,将其压力给阻塞队列,不会让其服务器一次性的就崩了.
什么是阻塞队列
阻塞队列:阻塞队列能够保证线程安全-------最重要的应用场景: 生产者消费者模型
满足:
- 当队列满时,尝试入队列,会进行阻塞
- 当队列空时,尝试出对列,会进行阻塞
生产者消费者模型优点
- 阻塞队列能使得生产者和消费者之间的解耦
- 阻塞队列相当于缓冲区,平衡生产者和消费者的处理能力
生产者消费者模型实现
生产者消费者模式与保护性暂停模式有区别:
- 保护性暂停模式需要生产结果的线程与消费结果的线程要一一对应的..
- 而生产者消费者模式不需要两个线程一一对应,可以理解为异步,产生结果和接收结果有延迟.结果产生了并不是立即拿到.
我们这个生产者消费者模型使用一个队列(可以称为消息队列) 来实现的,这个消息队列对生产者和消费者进行解耦,来达到平衡生产者和消费者的线程资源.(平衡生产者和消费者的处理能力)
生产者只负责产生结果数据,不关心数据如何去处理,消费者只负责专心消费结果数据,不需要关心数据如何生产的.
这个消息队列有容量限制:
- 当队列满的时候,生产者就不会在加入数据,会阻塞住,就会等待消费者来消费数据.然后通知生产者生产数据.
- 当队列为空的时候,消费者不会在消费数据,会阻塞住,等待生产者生产数据之后通知消费者.
JDK中的阻塞队列就是基于生产者消费者模式来实现的
Message
/*** 消息->不能被子类覆盖, 只有get方法->只能获取到消息id/val*/
public final class Message {private int id;private String val;public Message(int id, String val) {this.id = id;this.val = val;}public int getId() {return id;}public String getVal() {return val;}
}
这个Message属于消息类,也就是生产者和消费者传递的消息.
这里特意加了final表示不能有子类,当然也不能被子类重新,也就是说生产者和消费者传递的消息是不可以被改变的.
同时也去掉了这个Message类的set方法,也表示不能修改整个消息
MessageQueue
@Slf4j(topic = "c.MessageQueue")
public class MessageQueue {//用一个集合来生产和消费消息-->从尾部生产,从头部消费private LinkedList<Message> list = new LinkedList<>();//消息队列容量private int capacity;public MessageQueue(int capacity) {this.capacity = capacity;}//获取消息-->消费者public Message get(){//如果消息队列为空就一直等待不能消费synchronized (list) {while(list.isEmpty()){try {log.debug("消息队列已空,消费者阻塞");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}//消费掉Message message = list.removeFirst();log.debug("已消费消息");list.notifyAll();return message;}}//存入消息-->生产者public void take(Message message){synchronized (list) {while(list.size()==capacity) {try {log.debug("消息队列已满,生产者阻塞");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.addLast(message);log.debug("已生产消息");list.notifyAll();}}
}
MessageQueue成员解析
//用一个集合来生产和消费消息-->从尾部生产,从头部消费
private LinkedList<Message> list = new LinkedList<>();
//消息队列容量
private int capacity;
//为整个消息队列初始化容量
public MessageQueue(int capacity) {this.capacity = capacity;
}
获取消息get方法
public Message get(){//如果消息队列为空就一直等待不能消费synchronized (list) {while(list.isEmpty()){try {log.debug("消息队列已空,消费者阻塞");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}//消费掉Message message = list.removeFirst();log.debug("已消费消息");list.notifyAll();return message;}
}
get方法由消费者执行,消费者来获取消息,也就是消费消息.
因为生产者和消费者不止一个,也就是多个线程,所以肯定会有线程安全问题->使用的集合不是线程安全的.
所以我们就要为线程共享的作为锁对象,这里拿list作为锁对象. 为它加锁.
while(list.isEmpty()){try {log.debug("消息队列已空,消费者阻塞");list.wait();} catch (InterruptedException e) {e.printStackTrace();}
}
消费者先要判断队列是否为空,如果队列为空就不进行消费了,就等待,等待生产者生产完消息之后通知消费者继续消费.
//消费掉
Message message = list.removeFirst();
log.debug("已消费消息");
list.notifyAll();
return message;
否则的话,如果队列不为空,消费者就可以消费消息了,消费完消息就通知生产者可以生产消息了.
生产消息take方法
//存入消息-->生产者
public void take(Message message){synchronized (list) {while(list.size()==capacity) {try {log.debug("消息队列已满,生产者阻塞");list.wait();} catch (InterruptedException e) {e.printStackTrace();}}list.addLast(message);log.debug("已生产消息");list.notifyAll();}
}
生产消息的方法由生产者执行,,首先还是对多个线程共享的队列上锁.
- 生产者这边先要判断队列是否满了,如果满了,生产者就不在生产消息,进行wait等待
- 如果队列没有满,极具生产消息,生产完消息之后就通知消费者可以消费消息了
测试生产者消费者模型
public class TestMain {public static void main(String[] args) throws InterruptedException {MessageQueue queue = new MessageQueue(2);//生产者线程for(int i =0;i<3;++i) {int id = i;new Thread(()->{queue.take(new Message(id,"message" + id));},"生产者" + i).start();}Thread.sleep(1000);//消费者线程-->消费者不断地消费new Thread(()->{while(true) {Message message = queue.get();}},"消费者").start();}
}
- 首先创建容量为2的阻塞队列.
- 创建三个生产者线程来生产消息.
- 创建一个消费者线程不断地消费消息.
我们看执行结果.
先是生产者0和生产者1进行生产消息,然后当生产2生产消息的时候,队列就已经满了,生产者阻塞住,接下来就会通知消费者消费消息,当消费了2个消息之后,在消费一个消息时,队列为空,就会阻塞住,因为刚才生产者2线程阻塞住了,这时消费者那边通知队列为空,所以生产者2在生产一个消息,消费者消费完之后在消费,队列就为空了,阻塞住了.
因为就创建3个线程来生产消息,所以最终就阻塞住,没有消息生产了,消费者也就不能消费消息了