简述
在上一篇文章中,我们使用了可重入锁ReentrantLock和条件Condition实现了一个简单的阻塞队列。其实还是可以继续优化,比如可以像LinkedBlockingQueue一样,把生产者的锁和消费者的锁分开。
代码
队列代码
public class MaxBlockingQueue<T> {
/** 存放元素的数组 */
private final Object[] items;
/** 弹出元素的位置 */
private int takeIndex;
/** 插入元素的位置 */
private int putIndex;
/** 队列中的元素总数 */
private AtomicInteger count = new AtomicInteger(0);
/** 插入锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 队列未满的条件变量 */
private final Condition notFull = putLock.newCondition();
/** 弹出锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 队列非空的条件变量 */
private final Condition notEmpty = takeLock.newCondition();
/**
* 指定队列大小的构造器
*
* @param capacity 队列大小
*/
public MaxBlockingQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
items = new Object[capacity];
}
/**
* 入队操作
*
* @param e 待插入的对象
*/
private void enqueue(Object e) {
// 将对象e放入putIndex指向的位置
items[putIndex] = e;
putIndex++;
// putIndex向后移一位,如果已到末尾则返回队列开头(位置0)
if (putIndex == items.length)
putIndex = 0;
}
/**
* 出队操作
*
* @return 被弹出的元素
*/
private Object dequeue() {
// 取出takeIndex指向位置中的元素
// 并将该位置清空
Object e = items[takeIndex];
items[takeIndex] = null;
takeIndex++;
// takeIndex向后移一位,如果已到末尾则返回队列开头(位置0)
if (takeIndex == items.length)
takeIndex = 0;
// 返回之前代码中取出的元素e
return e;
}
/**
* 将指定元素插入队列
*
* @param e 待插入的对象
*/
public void put(Object e) throws InterruptedException {
int c = -1;
putLock.lock();
try {
while (count.get() == items.length) {
// 队列已满时进入休眠
// 等待队列未满条件得到满足
notFull.await();
}
// 执行入队操作,将对象e实际放入队列中
enqueue(e);
// 增加元素总数
c = count.getAndIncrement();
// 如果在插入后队列仍然没满,则唤醒其他等待插入的线程
if (c + 1 < items.length)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果插入之前队列为空,才唤醒等待弹出元素的线程
// 为了防止死锁,不能在释放putLock之前获取takeLock
if (c == 0)
signalNotEmpty();
}
/**
* 唤醒等待队列非空条件的线程
*/
private void signalNotEmpty() {
// 为了唤醒等待队列非空条件的线程,需要先获取对应的takeLock
takeLock.lock();
try {
// 唤醒一个等待非空条件的线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
/**
* 从队列中弹出一个元素
*
* @return 被弹出的元素
*/
public T take() throws InterruptedException {
Object e;
int c = -1;
takeLock.lock();
try {
while (count.get() == 0) {
// 队列为空时进入休眠
// 等待队列非空条件得到满足
notEmpty.await();
}
// 执行出队操作,将队列中的第一个元素弹出
e = dequeue();
// 减少元素总数
c = count.getAndDecrement();
// 如果队列在弹出一个元素后仍然非空,则唤醒其他等待队列非空的线程
if (c - 1 > 0)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 只有在弹出之前队列已满的情况下才唤醒等待插入元素的线程
// 为了防止死锁,不能在释放takeLock之前获取putLock
if (c == items.length)
signalNotFull();
return (T)e;
}
/**
* 唤醒等待队列未满条件的线程
*/
private void signalNotFull() {
// 为了唤醒等待队列未满条件的线程,需要先获取对应的putLock
putLock.lock();
try {
// 唤醒一个等待队列未满条件的线程
notFull.signal();
} finally {
putLock.unlock();
}
}
}
评论区