侧边栏壁纸
博主头像
Terry

『LESSON 5』

  • 累计撰写 90 篇文章
  • 累计创建 21 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Java实现阻塞队列(二)

Terry
2020-11-29 / 0 评论 / 0 点赞 / 692 阅读 / 2,791 字 / 正在检测是否收录...

简述

在上一篇文章中,我们使用了可重入锁ReentrantLock和条件Condition实现了一个简单的阻塞队列。其实还是可以继续优化,比如可以像LinkedBlockingQueue一样,把生产者的锁和消费者的锁分开。

代码

队列代码

public class MaxBlockingQueue<T> {

    /** 存放元素的数组 */
    private final Object[] items;

    /** 弹出元素的位置 */
    private int takeIndex;

    /** 插入元素的位置 */
    private int putIndex;

    /** 队列中的元素总数 */
    private AtomicInteger count = new AtomicInteger(0);

    /** 插入锁 */
    private final ReentrantLock putLock = new ReentrantLock();

    /** 队列未满的条件变量 */
    private final Condition notFull = putLock.newCondition();

    /** 弹出锁 */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** 队列非空的条件变量 */
    private final Condition notEmpty = takeLock.newCondition();

    /**
     * 指定队列大小的构造器
     *
     * @param capacity  队列大小
     */
    public MaxBlockingQueue(int capacity) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        items = new Object[capacity];
    }

    /**
     * 入队操作
     *
     * @param e 待插入的对象
     */
    private void enqueue(Object e) {
        // 将对象e放入putIndex指向的位置
        items[putIndex] = e;
        putIndex++;
        // putIndex向后移一位,如果已到末尾则返回队列开头(位置0)
        if (putIndex == items.length)
            putIndex = 0;
    }

    /**
     * 出队操作
     *
     * @return  被弹出的元素
     */
    private Object dequeue() {
        // 取出takeIndex指向位置中的元素
        // 并将该位置清空
        Object e = items[takeIndex];
        items[takeIndex] = null;
        takeIndex++;
        // takeIndex向后移一位,如果已到末尾则返回队列开头(位置0)
        if (takeIndex == items.length)
            takeIndex = 0;

        // 返回之前代码中取出的元素e
        return e;
    }

    /**
     * 将指定元素插入队列
     *
     * @param e 待插入的对象
     */
    public void put(Object e) throws InterruptedException {
        int c = -1;
        putLock.lock();
        try {
            while (count.get() == items.length) {
                // 队列已满时进入休眠
                // 等待队列未满条件得到满足
                notFull.await();
            }

            // 执行入队操作,将对象e实际放入队列中
            enqueue(e);

            // 增加元素总数
            c = count.getAndIncrement();

            // 如果在插入后队列仍然没满,则唤醒其他等待插入的线程
            if (c + 1 < items.length)
                notFull.signal();

        } finally {
            putLock.unlock();
        }

        // 如果插入之前队列为空,才唤醒等待弹出元素的线程
        // 为了防止死锁,不能在释放putLock之前获取takeLock
        if (c == 0)
            signalNotEmpty();
    }

    /**
     * 唤醒等待队列非空条件的线程
     */
    private void signalNotEmpty() {
        // 为了唤醒等待队列非空条件的线程,需要先获取对应的takeLock
        takeLock.lock();
        try {
            // 唤醒一个等待非空条件的线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

    /**
     * 从队列中弹出一个元素
     *
     * @return  被弹出的元素
     */
    public T take() throws InterruptedException {
        Object e;
        int c = -1;
        takeLock.lock();
        try {
            while (count.get() == 0) {
                // 队列为空时进入休眠
                // 等待队列非空条件得到满足
                notEmpty.await();
            }

            // 执行出队操作,将队列中的第一个元素弹出
            e = dequeue();

            // 减少元素总数
            c = count.getAndDecrement();

            // 如果队列在弹出一个元素后仍然非空,则唤醒其他等待队列非空的线程
            if (c - 1 > 0)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }

        // 只有在弹出之前队列已满的情况下才唤醒等待插入元素的线程
        // 为了防止死锁,不能在释放takeLock之前获取putLock
        if (c == items.length)
            signalNotFull();

        return (T)e;
    }

    /**
     * 唤醒等待队列未满条件的线程
     */
    private void signalNotFull() {
        // 为了唤醒等待队列未满条件的线程,需要先获取对应的putLock
        putLock.lock();
        try {
            // 唤醒一个等待队列未满条件的线程
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
    
}

参考

从0到1实现自己的阻塞队列(下)

0

评论区