生产者和消费者
目录
- 生产者和消费者
- 1.什么是生产者和消费者
- 2.生产者和消费者(不加唤醒机制)
- 3.生产者和消费者(加唤醒机制)
- 4.解决虚假唤醒
- 5.使用lock锁
- 6.面试题
1.什么是生产者和消费者
在日常生活中,我们去商店买东西,我们就是消费者,商店里面的某件商品有可能被卖完,那么这件商品就要生产(即进货),要保持两者统一。不能说顾客去买东西,商店里面没有就没有了,当商店里面没有东西之后就要去生产(进货),商店也是有限容量的,一件商品不能放太多,有个固定容量,生产的时候不能超过这个容量。
2.生产者和消费者(不加唤醒机制)
public class ProductorAndConsumer {public static void main(String[] args) {Market market = new Market();Productor productor = new Productor(market);Consumer consumer = new Consumer(market);new Thread(productor,"生产者A").start();new Thread(consumer,"消费者B").start();}
}//商店
class Market{//某件商品数量,最开始为0private int product=0;//进货方法,在多线程环境下,如果不加锁会产生线程安全问题,这里加synchronized锁public synchronized void get(){//限定商店容量为10if(product>=10){System.out.println("仓库已满!");}else{System.out.println(Thread.currentThread().getName()+"进货成功!-->"+ ++product);}}//出售方法public synchronized void sale(){if(product<=0){System.out.println("已售罄!");}else {System.out.println(Thread.currentThread().getName()+"出售成功-->"+ --product);}}
}//生产者,生产者不可能只有一个,所以是多线程的
class Productor implements Runnable{private Market market;public Productor(Market market){this.market=market;}@Overridepublic void run() {//一次买15个for (int i=0;i<15;i++){market.get();}}
}//消费者
class Consumer implements Runnable{private Market market;public Consumer(){}public Consumer(Market market){this.market=market;}@Overridepublic void run() {//一次买10个for (int i=0;i<10;i++){market.sale();}}
}
当不加唤醒机制的生产者和消费者模式,出现售罄情况时不会立即去生产,出现仓库已满情况时也不会立即去出售,如下:
3.生产者和消费者(加唤醒机制)
public class ProductorAndConsumer {public static void main(String[] args) {Market market = new Market();Productor productor = new Productor(market);Consumer consumer = new Consumer(market);new Thread(productor,"生产者A").start();new Thread(consumer,"消费者B").start();}
}//商店
class Market{//某件商品数量,最开始为0private int product=0;//进货方法,在多线程环境下,如果不加锁会产生线程安全问题,这里加synchronized锁public synchronized void get(){//限定商店容量为10if(product>=10){System.out.println("仓库已满!");//当仓库已满,需要停止生产try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}else{System.out.println(Thread.currentThread().getName()+"进货成功!-->"+ ++product);//当进货成功,就需要唤醒this.notifyAll();}}//出售方法public synchronized void sale(){if(product<=0){System.out.println("已售罄!");//售罄之后需要停止去生产try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}else {System.out.println(Thread.currentThread().getName()+"出售成功-->"+ --product);//出售成功之后需要生产this.notifyAll();}}
}//生产者,生产者不可能只有一个,所以是多线程的
class Productor implements Runnable{private Market market;public Productor(Market market){this.market=market;}@Overridepublic void run() {//一次买15个for (int i=0;i<15;i++){market.get();}}
}//消费者
class Consumer implements Runnable{private Market market;public Consumer(){}public Consumer(Market market){this.market=market;}@Overridepublic void run() {//一次买10个for (int i=0;i<10;i++){market.sale();}}
}
以上代码看起来没有什么问题,但是在实际运行时程序没有停止,如下图:
分析问题原因,当生产者循环到最后一次时,消费者还有两次循环,当商品售罄之后,消费者会wait(),而生产者的循环已经结束,那么程序就一直会卡在wait(),不会结束。
4.解决虚假唤醒
为了解决3出的问题,那么就要让消费者的最后一次循环中可以执行notifyAll()方法,这样才可以结束。
那么就可以把else去掉,这样就可以使消费者最后一次执行结束,代码如下:
public class ProductorAndConsumer {public static void main(String[] args) {Market market = new Market();Productor productor = new Productor(market);Consumer consumer = new Consumer(market);new Thread(productor,"生产者A").start();new Thread(consumer,"消费者B").start();new Thread(productor,"生产者C").start();new Thread(consumer,"消费者D").start();}
}//商店
class Market{//某件商品数量,最开始为0private int product=0;//进货方法,在多线程环境下,如果不加锁会产生线程安全问题,这里加synchronized锁public synchronized void get(){//限定商店容量为10while(product>10){System.out.println("仓库已满!");//当仓库已满,需要停止生产try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"进货成功!-->"+ ++product);//当进货成功,就需要唤醒this.notifyAll();}//出售方法public synchronized void sale(){while(product<=0){System.out.println("已售罄!");//售罄之后需要停止去生产try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"出售成功-->"+ --product);//出售成功之后需要生产this.notifyAll();}
}//生产者,生产者不可能只有一个,所以是多线程的
class Productor implements Runnable{private Market market;public Productor(Market market){this.market=market;}@Overridepublic void run() {//一次买15个for (int i=0;i<15;i++){market.get();}}
}//消费者
class Consumer implements Runnable{private Market market;public Consumer(){}public Consumer(Market market){this.market=market;}@Overridepublic void run() {//一次买10个for (int i=0;i<10;i++){market.sale();}}
}
在修改以上代码中还出现了”虚假唤醒“的问题,即存在多个生产者和消费者的时候,当商店没有商品时,两个消费者都停在了wait()方法,然后生产者进行生产,生产一件之后,会执行notifyAll()方法,那么两个消费者都会去执行,就会出现商品不够的现象。
为了解决以上问题,将if判断改成了while判断,生产者notifyAll()之后,消费者还需再次判断商品是否足够,不够继续wait()。
5.使用lock锁
之前我们使用的是synchronized锁,我们现在改为lock同步锁进行替换。
【注意】:
- 使用lock锁的时候,必须要对其进行释放,一般放在finally中
- 使用lock锁之后,需要使用Condition进行唤醒操作
- wait()方法变为await()方法
- notify()变为singnal()方法
- notifyAll()变为signalAll()方法
public class ProductorAndConsumer {public static void main(String[] args) {Market market = new Market();Productor productor = new Productor(market);Consumer consumer = new Consumer(market);new Thread(productor,"生产者A").start();new Thread(consumer,"消费者B").start();new Thread(productor,"生产者C").start();new Thread(consumer,"消费者D").start();}
}//商店
class Market{//某件商品数量,最开始为0private int product=0;private Lock lock=new ReentrantLock();private Condition condition=lock.newCondition();//进货方法,在多线程环境下,如果不加锁会产生线程安全问题,这里加synchronized锁public void get(){lock.lock();try{//限定商店容量为10while(product>10){System.out.println("仓库已满!");//当仓库已满,需要停止生产try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"进货成功!-->"+ ++product);//当进货成功,就需要唤醒condition.signalAll();}finally {lock.unlock();}}//出售方法public synchronized void sale(){lock.lock();try{while(product<=0){System.out.println("已售罄!");//售罄之后需要停止去生产try {condition.await();} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"出售成功-->"+ --product);//出售成功之后需要生产condition.signalAll();}finally {lock.unlock();}}
}//生产者,生产者不可能只有一个,所以是多线程的
class Productor implements Runnable{private Market market;public Productor(Market market){this.market=market;}@Overridepublic void run() {//一次买15个for (int i=0;i<15;i++){market.get();}}
}//消费者
class Consumer implements Runnable{private Market market;public Consumer(){}public Consumer(Market market){this.market=market;}@Overridepublic void run() {//一次买10个for (int i=0;i<10;i++){market.sale();}}
}
6.面试题
使用多线程实现先打印线程AA打印”AA“5次,线程BB打印”BB“10次,线程CC打印”CC“15次,总共打印10轮。
【使用Lock中的Condition】
public class test {private static Integer a = 1;public static void main(String[] args) {/*** 线程的创建方式:* 1.new Thread* 2.Callable* 3.Runnable* 4.线程池*/add add = new add();new Thread(()->{for (int i=0;i<10;i++){add.printAA(i);}},"AA").start();new Thread(()->{for (int i=0;i<10;i++){add.printBB(i);}},"BB").start();new Thread(()->{for (int i=0;i<10;i++){add.printCC(i);}},"CC").start();}
}class add{private int flag=1;private Lock lock=new ReentrantLock();Condition condition1=lock.newCondition();Condition condition2=lock.newCondition();Condition condition3=lock.newCondition();//打印AAvoid printAA(int loop){lock.lock();try{while (flag!=1){try {condition1.await();} catch (InterruptedException e) {e.printStackTrace();}}for (int i = 0; i < 5; i++) {System.out.println("AA"+"轮数:"+(loop+1));}flag++;condition2.signal();}finally {lock.unlock();}}//打印BBvoid printBB(int loop){lock.lock();try{while (flag!=2){try {condition2.await();} catch (InterruptedException e) {e.printStackTrace();}}for (int i = 0; i < 10; i++) {System.out.println("BB"+"轮数:"+(loop+1));}flag++;condition3.signal();}finally {lock.unlock();}}//打印CCvoid printCC(int loop){lock.lock();try{while (flag!=3){try {condition3.await();} catch (InterruptedException e) {e.printStackTrace();}}for (int i = 0; i < 15; i++) {System.out.println("CC"+"轮数:"+(loop+1));}flag=1;condition1.signal();}finally {lock.unlock();}}}
【使用Object中的wait、notify、notifyAll】
public class test {private static Integer a = 1;public static void main(String[] args) {/*** 线程的创建方式:* 1.new Thread* 2.Callable* 3.Runnable* 4.线程池*/add add = new add();new Thread(()->{for (int i=0;i<10;i++){add.printAA(i);}},"AA").start();new Thread(()->{for (int i=0;i<10;i++){add.printBB(i);}},"BB").start();new Thread(()->{for (int i=0;i<10;i++){add.printCC(i);}},"CC").start();}
}class add{private int flag=1;//打印AAsynchronized void printAA(int loop){while (flag!=1){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}flag++;for (int i = 0; i < 5; i++) {System.out.println("AA"+"轮数:"+(loop+1));}this.notifyAll();}//打印BBsynchronized void printBB(int loop){while (flag!=2){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}flag++;System.out.println("BB"+"轮数:"+(loop+1));this.notifyAll();}//打印CCsynchronized void printCC(int loop){while (flag!=3){try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}flag=1;System.out.println("CC"+"轮数:"+(loop+1));this.notifyAll();}}