背景知识:
如果想要对LinkedBlockingQueue
原理有所了解的话, 就需要对Java的wait-notify线程通信机制有所了解.
在Object类里面, 定义了如下的几个方法wait()/notify()/notifyAll()
.
对某个lock对象进行加锁(synchronized(lock){...}
).
在锁的内部, 当前线程调用了lock.wait()
方法后, 当前线程就阻塞在当前的位置, 同时放弃了锁的占用, 此时其他的线程可以获取锁继续执行锁内的代码, 当然其他线程调用了lock.wait()
方法后, 也会阻塞在这个位置, 重复刚才的步骤.
当在锁内调用lock.notify()
方法后, 会随机唤醒一个阻塞在lock.wait()
处的线程, 被唤醒的线程会继续尝试获取锁, 如果获取锁成功, 继续执行下面的代码.
下面看一段基于消费者/生产者模型实现的阻塞队列
1 | public class BlockingList { |
在上面的代码中, 我们分别设置2个生产者, 2个消费者. 每个生产者生产1个数据, 每个消费者消费1个数据, 队列最大长度为1. 生产完之后等待1秒钟, 再让消费者去消费, 输出结果是
1 | Add -Thread-0 ---> start add : s0 |
好了, 有了对wait-notify有了一个基本认识之后, 我们继续来看 LinkedBlockingQueue.
An optionally-bounded blocking queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
上面是 LinkedBlockingQueue (Java Platform SE 7 )
的Java Doc上的说明, 从中可以看出, `LinkedBlockingQueue`` 是一个基于链表 可选边界的先进先出阻塞队列.
先从LinkedBlockingQueue
的继承结构来大概了解一下`LinkedBlockingQueue``
我们看到LinkedBlockingQueue
实现了Queue
, BlockingQueue
俩个接口和继承了AbstractQueue
抽象类(其实它还实现了Collection
接口, 这不是我们的重点就不再讲解了).
Queue
定义了队列基础API, 定义三个入列方法, 三个出列方法.
add(E e)
: 插入成功返回true, 插入失败则抛出异常offer(E e)
: 插入成功返回true, 插入失败返回falseremove()
: 出列队首元素, 队列为空抛出异常.poll()
: 出列队首元素, 队列为空返回false.element()
: 返回队首元素, 队列为空抛出异常.peek()
:返回队首元素, 队列为空返回null.
BlockingQueue定义了阻塞相关API, 新增了俩个入列和俩个出列阻塞API.
put(E e)
: 队列非满则插入, 队列满则等待.offer(E e, long timeout, TimeUnit unit)
: offer(E e)的带有等待时间的版本.take()
: 队列非空则出列, 队列空则等待.poll(long timeout, TimeUnit unit)
: poll()的带有等待时间的版本.
AbstractQueue针对上面的接口提供了一些默认的实现
add(E e)
: 基于offer(E e)实现remove()
: 基于poll()实现.element()
: 基于peek()实现.
现在我们就对LinkedBlockingQueue
有了一个大概的认识, 下面我从put()/take()
俩个方法来看一下它的阻塞功能是如何实现的
put()
1 | public void put(E e) throws InterruptedException { |
take()
1 | public E take() throws InterruptedException { |
通过源码我们可以看到, LinkedBlockingQueue
采用的是我们刚开始的wait-notify
机制实现的, 只不过它没有使用Object
提供的, 而是采用了java.util.concurrent.locks.Condition
实现的.
由于LinkedBlockingQueue
是基于链表做的, LinkedBlockingQueue
对队头和队尾各自使用了一把锁来做并发控制.
在put()
方法中, 关键点在于搞清楚下面俩点:
if(c +1< capacity) notFull.signal();
if(c ==0) signalNotEmpty();
1 | if (c + 1 < capacity) |
这个是判断在多线程的环境下起到作用, 假设现在有4个线程都在await()
处阻塞, take()
取出一个数据, 现在唤醒了一个线程, 那么当该线程继续put的时候, 通过该判断, 如果队列非满则将阻塞在await()出的线程继续唤醒, 直到队列满了或者全部唤醒.
1 | if (c == 0) |
由于前边是调用的count.getAndIncrement()
, 如果c为0, 那么现在队列里就有了一个元素, 唤醒阻塞在出列的await()
处的线程, 可以继续出列, 取数据了.
以上就是put()入列分析, 出列流程也类似, 大家可以自己分析一下