Producer-Consumer模式在很多场景种都有相应的体现,比如线程池,对象池,MQ等,Pro-Con的本质是在生产者与消费者之间引入一个通道(Channel暂且理解为一个队列),该通道主要用于 ①控制生产者与消费者的相对速率,尽可能地保证生产的Product尽快被消费,另一方面②对二者进行解耦 ,生产者和消费者在各自的线程中,通过Channel连接,二者无直接关联。
Producer-Consumer中的角色
1.Product:生产者线程锁需要的产品
2.Producer:负责生产Product,并将其放入到队列Channel中
3.Consumer:从队列Channel中获取对应的产品,获取之后对其进行业务处理
4.Channel:这是最重要的概念,Channel就是二者共享的区域,Channel有着调控生产者与消费者相对速率的功能,内存缓存区也是生产者-消费者模式的核心组件
Channel的调控功能
- Vp>Vc : 生产者放缓速度,反映到代码就是 生产者线程休眠
- Vp<Vc: 消费者放缓速度,反映到代码就是 消费者线程休眠
需要注意的是,不是说比较下速度就立即休眠,而是有更具体的策略
Channel的实现方案
BlockingQueue
阻塞队列是一种常见的实现方案,在JDK中提供了多种BlockingQueue的数据结构。
1 | public interface BlockingQueue<E> extends Queue<E> { |
2 | boolean add(E var1); |
3 | |
4 | boolean offer(E var1); |
5 | |
6 | void put(E var1) throws InterruptedException; |
7 | |
8 | boolean offer(E var1, long var2, TimeUnit var4) throws InterruptedException; |
9 | |
10 | E take() throws InterruptedException; |
11 | |
12 | E poll(long var1, TimeUnit var3) throws InterruptedException; |
13 | |
14 | int remainingCapacity(); |
15 | |
16 | boolean remove(Object var1); |
17 | |
18 | boolean contains(Object var1); |
19 | |
20 | int drainTo(Collection<? super E> var1); |
21 | |
22 | int drainTo(Collection<? super E> var1, int var2); |
23 | } |
BlockingQueue的核心方法
方法 | 抛出异常 | 返回特殊值 | 超时退出 | 一直阻塞 |
---|---|---|---|---|
入队方法 | add(e) | offer(e) | offer(e,time,unit) | put(e) |
出队方法 | remove() | poll() | poll(time,unit) | take() |
另外还有l两类函数,了解一下:
int drainTo(Collection<? super E> c);: 移除此队列中所有可用的元素,并将它们添加到给定collection 中。此操作可能比反复轮询此队列更有效。在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时。如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException异常。此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的
int drainTo(Collection<? super E> c, int maxElements): 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中,别的行为与上面int drainTo(Collection<? super E> c)一样。
E peek(): 获取但不移除此队列的头元素;如果此队列为空,则返回 null
BlockingQueue成员类(实现类)
1.ArrayBlockingQueue
基于数组的阻塞队列的实现,在ArrayBlockingQueue内部维护了一个定长数组,以便缓存队列中的数据对象,除此之外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行执行,这点尤其不同于LinkedBlockingQueue。
按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者的操作完全并行运行,DougLea之所以没有这样做,也许是因为ArrayBloickingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性,其在性能上完全占不到任何便宜。
ArrayBlockingQueue和LinkedBlockingQueue之间还有一个明显的不同之处在于,前者在插入或删除元素时,不会产生或销毁任何额外的对象实例,而后者则会生产一个额外的Node对象,这在长时间内需要高效并发地处理大批量的数据的系统中,其对于GC的影响还是存在一定区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。
– 感觉得先复习下Java并发相关的内容了,这部分的内容需要继续大量的补充,或者连接到相关文章–
一个生产者消费者模式的具体实现
这里用的是ConcurrentLinkedQueue,并不是BlockingQueue的实现类,它是无界非阻塞的缓冲队列,采用CAS同步算法,效率更高。
—————————— 采用ConcurrentLinkedQueue————————————-
Data.java
1 | public class Data{ |
2 | private final int data; |
3 | |
4 | public Data(int data){ |
5 | this.data = data; |
6 | } |
7 | //....getter and setter..... |
8 | } |
Producer.java
1 | public class Producer implements Runnable { |
2 | // 缓冲队列 |
3 | private ConcurrentLinkedQueue<Data> queue; |
4 | // 每个生产者生产数据的数量 |
5 | private int produceNum; |
6 | |
7 | // 任务计数,同时模拟数据生产 |
8 | private static AtomicInteger count = new AtomicInteger(0); |
9 | // 需要生产的总任务数量,出处有耦合性,实际开发时应当处理。 |
10 | private static AtomicInteger total = new AtomicInteger(Main.NUMS); |
11 | |
12 | public Producer(ConcurrentLinkedQueue<Data> queue, int produceNum) { |
13 | this.queue = queue; |
14 | this.produceNum = produceNum; |
15 | } |
16 | |
17 | |
18 | public void run() { |
19 | Data data = null; |
20 | // 限定生产,防止超量生产 |
21 | if (total.get() <= 0) { |
22 | Thread.currentThread().interrupt(); |
23 | } |
24 | for (int i = 0; i < produceNum; i++) { |
25 | // 再次检测,防止超量生产 |
26 | if (total.get() > 0) { |
27 | // 构造数据 |
28 | data = new Data(count.incrementAndGet()); |
29 | // 向缓冲队列中提交 |
30 | // ConcurrentLinkedQueue是无界非阻塞的,没有put方法 |
31 | if (!queue.offer(data)) { |
32 | System.out.println("Filed to put data:" + data); |
33 | } else { |
34 | System.out.println("Successfully produced data:" + data); |
35 | } |
36 | // 需要生产的总量-1 |
37 | total.decrementAndGet(); |
38 | } else { |
39 | Thread.currentThread().interrupt(); |
40 | break; |
41 | } |
42 | } |
43 | } |
44 | } |
Consumer.java
1 | public class Consumer implements Runnable { |
2 | private ConcurrentLinkedQueue<Data> queue; |
3 | private CountDownLatch countDown; |
4 | |
5 | public Consumer(ConcurrentLinkedQueue<Data> queue, CountDownLatch countDown) { |
6 | this.queue = queue; |
7 | this.countDown = countDown; |
8 | } |
9 | |
10 | |
11 | public void run() { |
12 | Data data = queue.poll(); |
13 | if (data != null) { |
14 | // 计算+1 |
15 | int res = data.getData() + 1; |
16 | System.out.println("Data processing: " + MessageFormat.format("{0}+1={1}", data.getData(), res)); |
17 | // 记录该线程任务已结束 |
18 | countDown.countDown(); |
19 | } |
20 | } |
21 | } |
Main.java
1 | public class Main { |
2 | public static final int NUMS = 1000000; |
3 | |
4 | public static void main(String[] args) { |
5 | |
6 | // 定义缓冲队列 |
7 | ConcurrentLinkedQueue<Data> queue = new ConcurrentLinkedQueue<>(); |
8 | // 创建生产者和消费者线程池,为防止冲爆内存,这里限定线程数 |
9 | ExecutorService producerEs = Executors.newFixedThreadPool(100); |
10 | ExecutorService consumerEs = Executors.newFixedThreadPool(100); |
11 | // 定义计时器,等待消费者全部消费完 |
12 | CountDownLatch countDown = new CountDownLatch(NUMS); |
13 | |
14 | long start = System.currentTimeMillis(); |
15 | long end = System.currentTimeMillis(); |
16 | |
17 | // 向线程池中提交 NUMS 个生产任务 |
18 | for (int i = 0; i < NUMS; i++) { |
19 | // 每个生产者以3倍的量生产,模拟生产者和消费者的速度差 |
20 | producerEs.execute(new Producer(queue,3)); |
21 | consumerEs.execute(new Consumer(queue, countDown)); |
22 | } |
23 | try { |
24 | countDown.await(); |
25 | end = System.currentTimeMillis(); |
26 | producerEs.shutdown(); |
27 | consumerEs.shutdown(); |
28 | } catch (InterruptedException e) { |
29 | e.printStackTrace(); |
30 | } |
31 | |
32 | System.out.println("Total time:" + (end - start)); |
33 | } |
34 | } |
———————————采用BlockingQueue 实现—————————————
在jdk里面,有一个关于消费者模式更为简要的代码描述,利用的阻塞队列来实现的
1 | class Producer implements Runnable { |
2 | private final BlockingQueue queue; |
3 | Producer(BlockingQueue q) { queue = q; } |
4 | public void run() { |
5 | try { |
6 | while (true) { queue.put(produce()); } |
7 | } catch (InterruptedException ex) { ... handle ...} |
8 | } |
9 | Object produce() { ... } |
10 | } |
11 | |
12 | class Consumer implements Runnable { |
13 | private final BlockingQueue queue; |
14 | Consumer(BlockingQueue q) { queue = q; } |
15 | public void run() { |
16 | try { |
17 | while (true) { consume(queue.take()); } |
18 | } catch (InterruptedException ex) { ... handle ...} |
19 | } |
20 | void consume(Object x) { ... } |
21 | } |
22 | |
23 | class Setup { |
24 | void main() { |
25 | BlockingQueue q = new SomeQueueImplementation(); |
26 | Producer p = new Producer(q); |
27 | Consumer c1 = new Consumer(q); |
28 | Consumer c2 = new Consumer(q); |
29 | new Thread(p).start(); |
30 | new Thread(c1).start(); |
31 | new Thread(c2).start(); |
32 | } |
33 | } |