Android Tech And Thoughts.

Producer consumer model

Word count: 2kReading time: 8 min
2019/12/24 Share

producer-consumer.png

Producer-Consumer模式在很多场景种都有相应的体现,比如线程池,对象池,MQ等,Pro-Con的本质是在生产者与消费者之间引入一个通道(Channel暂且理解为一个队列),该通道主要用于 ①控制生产者与消费者的相对速率,尽可能地保证生产的Product尽快被消费,另一方面②对二者进行解耦 ,生产者和消费者在各自的线程中,通过Channel连接,二者无直接关联。

Producer-Consumer中的角色

1.Product:生产者线程锁需要的产品
2.Producer:负责生产Product,并将其放入到队列Channel中
3.Consumer:从队列Channel中获取对应的产品,获取之后对其进行业务处理
4.Channel:这是最重要的概念,Channel就是二者共享的区域,Channel有着调控生产者与消费者相对速率的功能,内存缓存区也是生产者-消费者模式的核心组件

Channel的调控功能

  • Vp>Vc : 生产者放缓速度,反映到代码就是 生产者线程休眠
  • Vp<Vc: 消费者放缓速度,反映到代码就是 消费者线程休眠

需要注意的是,不是说比较下速度就立即休眠,而是有更具体的策略

Channel的实现方案

BlockingQueue

阻塞队列是一种常见的实现方案,在JDK中提供了多种BlockingQueue的数据结构。

1
public interface BlockingQueue<E> extends Queue<E> {
2
    boolean add(E var1);
3
4
    boolean offer(E var1);
5
6
    void put(E var1) throws InterruptedException;
7
8
    boolean offer(E var1, long var2, TimeUnit var4) throws InterruptedException;
9
10
    E take() throws InterruptedException;
11
12
    E poll(long var1, TimeUnit var3) throws InterruptedException;
13
14
    int remainingCapacity();
15
16
    boolean remove(Object var1);
17
18
    boolean contains(Object var1);
19
20
    int drainTo(Collection<? super E> var1);
21
22
    int drainTo(Collection<? super E> var1, int var2);
23
}

BlockingQueue的核心方法

方法 抛出异常 返回特殊值 超时退出 一直阻塞
入队方法 add(e) offer(e) offer(e,time,unit) put(e)
出队方法 remove() poll() poll(time,unit) take()

另外还有l两类函数,了解一下:
int drainTo(Collection<? super E> c);: 移除此队列中所有可用的元素,并将它们添加到给定collection 中。此操作可能比反复轮询此队列更有效。在试图向 collection c 中添加元素没有成功时,可能导致在抛出相关异常时。如果试图将一个队列放入自身队列中,则会导致 IllegalArgumentException异常。此外,如果正在进行此操作时修改指定的 collection,则此操作行为是不确定的
int drainTo(Collection<? super E> c, int maxElements): 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定collection中,别的行为与上面int drainTo(Collection<? super E> c)一样。
E peek(): 获取但不移除此队列的头元素;如果此队列为空,则返回 null

BlockingQueue成员类(实现类)

1.ArrayBlockingQueue

基于数组的阻塞队列的实现,在ArrayBlockingQueue内部维护了一个定长数组,以便缓存队列中的数据对象,除此之外,ArrayBlockingQueue内部还保存着两个整型变量,分别标识着队列的头部和尾部在数组中的位置。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用一个锁对象,由此也意味着两者无法真正并行执行,这点尤其不同于LinkedBlockingQueue。

按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者的操作完全并行运行,DougLea之所以没有这样做,也许是因为ArrayBloickingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性,其在性能上完全占不到任何便宜。

ArrayBlockingQueue和LinkedBlockingQueue之间还有一个明显的不同之处在于,前者在插入或删除元素时,不会产生或销毁任何额外的对象实例,而后者则会生产一个额外的Node对象,这在长时间内需要高效并发地处理大批量的数据的系统中,其对于GC的影响还是存在一定区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

– 感觉得先复习下Java并发相关的内容了,这部分的内容需要继续大量的补充,或者连接到相关文章–

一个生产者消费者模式的具体实现

这里用的是ConcurrentLinkedQueue,并不是BlockingQueue的实现类,它是无界非阻塞的缓冲队列,采用CAS同步算法,效率更高。

—————————— 采用ConcurrentLinkedQueue————————————-

Data.java

1
public class Data{
2
	private final int data;
3
	
4
	public Data(int data){
5
		this.data = data;
6
	}
7
	//....getter and setter.....
8
}

Producer.java

1
public class Producer implements Runnable {
2
    //  缓冲队列
3
    private ConcurrentLinkedQueue<Data> queue;
4
    //  每个生产者生产数据的数量
5
    private int produceNum;
6
7
    //  任务计数,同时模拟数据生产
8
    private static AtomicInteger count = new AtomicInteger(0);
9
    //  需要生产的总任务数量,出处有耦合性,实际开发时应当处理。
10
    private static AtomicInteger total = new AtomicInteger(Main.NUMS);
11
12
    public Producer(ConcurrentLinkedQueue<Data> queue, int produceNum) {
13
        this.queue = queue;
14
        this.produceNum = produceNum;
15
    }
16
17
    @Override
18
    public void run() {
19
        Data data = null;
20
        //  限定生产,防止超量生产
21
        if (total.get() <= 0) {
22
            Thread.currentThread().interrupt();
23
        }
24
        for (int i = 0; i < produceNum; i++) {
25
            //  再次检测,防止超量生产
26
            if (total.get() > 0) {
27
                //  构造数据
28
                data = new Data(count.incrementAndGet());
29
                //  向缓冲队列中提交
30
                //  ConcurrentLinkedQueue是无界非阻塞的,没有put方法
31
                if (!queue.offer(data)) {
32
                    System.out.println("Filed to put data:" + data);
33
                } else {
34
                    System.out.println("Successfully produced data:" + data);
35
                }
36
                //  需要生产的总量-1
37
                total.decrementAndGet();
38
            } else {
39
                Thread.currentThread().interrupt();
40
                break;
41
            }
42
        }
43
    }
44
}

Consumer.java

1
public class Consumer implements Runnable {
2
    private ConcurrentLinkedQueue<Data> queue;
3
    private CountDownLatch countDown;
4
5
    public Consumer(ConcurrentLinkedQueue<Data> queue, CountDownLatch countDown) {
6
        this.queue = queue;
7
        this.countDown = countDown;
8
    }
9
10
    @Override
11
    public void run() {
12
        Data data = queue.poll();
13
        if (data != null) {
14
            //  计算+1
15
            int res = data.getData() + 1;
16
            System.out.println("Data processing: " + MessageFormat.format("{0}+1={1}", data.getData(), res));
17
            //  记录该线程任务已结束
18
            countDown.countDown();
19
        }
20
    }
21
}

Main.java

1
public class Main {
2
    public static final int NUMS = 1000000;
3
4
    public static void main(String[] args) {
5
6
        //  定义缓冲队列
7
        ConcurrentLinkedQueue<Data> queue = new ConcurrentLinkedQueue<>();
8
        //  创建生产者和消费者线程池,为防止冲爆内存,这里限定线程数
9
        ExecutorService producerEs = Executors.newFixedThreadPool(100);
10
        ExecutorService consumerEs = Executors.newFixedThreadPool(100);
11
        //  定义计时器,等待消费者全部消费完
12
        CountDownLatch countDown = new CountDownLatch(NUMS);
13
14
        long start = System.currentTimeMillis();
15
        long end = System.currentTimeMillis();
16
17
        //  向线程池中提交 NUMS 个生产任务
18
        for (int i = 0; i < NUMS; i++) {
19
            //  每个生产者以3倍的量生产,模拟生产者和消费者的速度差
20
            producerEs.execute(new Producer(queue,3));
21
            consumerEs.execute(new Consumer(queue, countDown));
22
        }
23
        try {
24
            countDown.await();
25
            end = System.currentTimeMillis();
26
            producerEs.shutdown();
27
            consumerEs.shutdown();
28
        } catch (InterruptedException e) {
29
            e.printStackTrace();
30
        }
31
32
        System.out.println("Total time:" + (end - start));
33
    }
34
}

———————————采用BlockingQueue 实现—————————————

在jdk里面,有一个关于消费者模式更为简要的代码描述,利用的阻塞队列来实现的

1
class Producer implements Runnable {
2
  private final BlockingQueue queue;
3
  Producer(BlockingQueue q) { queue = q; }
4
  public void run() {
5
    try {
6
      while (true) { queue.put(produce()); }
7
    } catch (InterruptedException ex) { ... handle ...}
8
  }
9
  Object produce() { ... }
10
}
11
12
class Consumer implements Runnable {
13
  private final BlockingQueue queue;
14
  Consumer(BlockingQueue q) { queue = q; }
15
  public void run() {
16
    try {
17
      while (true) { consume(queue.take()); }
18
    } catch (InterruptedException ex) { ... handle ...}
19
  }
20
  void consume(Object x) { ... }
21
}
22
23
class Setup {
24
  void main() {
25
    BlockingQueue q = new SomeQueueImplementation();
26
    Producer p = new Producer(q);
27
    Consumer c1 = new Consumer(q);
28
    Consumer c2 = new Consumer(q);
29
    new Thread(p).start();
30
    new Thread(c1).start();
31
    new Thread(c2).start();
32
  }
33
}

Tahnks

BlockingQueue
深入浅出生产者-消费者模式
BlockingQueue-JDK

CATALOG
  1. 1. Producer-Consumer中的角色
  2. 2. Channel的实现方案
    1. 2.1. BlockingQueue
      1. 2.1.1. BlockingQueue的核心方法
      2. 2.1.2. BlockingQueue成员类(实现类)
  3. 3. 一个生产者消费者模式的具体实现
    1. 3.1. Tahnks