侧边栏壁纸
博主头像
Terry

『LESSON 5』

  • 累计撰写 90 篇文章
  • 累计创建 21 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

Java阻塞队列

Terry
2020-11-28 / 0 评论 / 0 点赞 / 693 阅读 / 4,851 字 / 正在检测是否收录...

简述

在Java中,我们经常使用的三个阻塞队列是ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue。今天我们简单说下这三个阻塞队列的特点,以后自己也是实现一个阻塞队列。

三种阻塞队列特点

LinkedBlockingQueue

首先我们来看下LinkedBlockingQueue的成员变量

    static class Node<E> {
        E item;

        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger();

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    transient Node<E> head;

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

可以看到:

  1. LinkedBlockingQueue中,锁是分为takeLock和putLock两把锁。
  2. 根据Node的数据结构和head/last两个属性,可以看出LinkedBlockingQueue是链表结构的阻塞队列。
  3. 由构造函数可知,LinkedBlockingQueue默认支持的长度为Integer.MAX_VALUE,自己也可以设置LinkedBlockingQueue的长度。
  4. LinkedBlockingQueue计算count的时候,使用了AtomicInteger来保证原子性。

ArrayBlockingQueue

    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

可以看到:

  1. ArrayBlockingQueue只有一把可重入锁,可以是公平模式也可以是非公平模式。
  2. ArrayBlockingQueue维护的是一个组数结构,并且初始化的时候必须初始化数组大小。
  3. ArrayBlockingQueue统计是直接使用了int类型成员变量count统计。因为count操作都是用锁保证了原子性,是在临界区内,所以可以使用int类型。

SynchronousQueue

    /**
     * The transferer. Set only in constructor, but cannot be declared
     * as final without further complicating serialization.  Since
     * this is accessed only at most once per public method, there
     * isn't a noticeable performance penalty for using volatile
     * instead of final here.
     */
    private transient volatile Transferer<E> transferer;

    /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
    }

SynchronousQueue是一个比较特别的队列,主要实现在transfer这个方法中。里面使用了大量的CAS操作来保证原子性。
本来生产者-消费者模式是这样的:

  1. 有多个生产者,可以并发生产产品,把产品置入队列中,如果队列满了,生产者就会阻塞。
  2. 有多个消费者,并发从队列中获取产品,如果队列空了,消费者就会阻塞。

但是SynchronousQueue比较特别,没有内部容器。当生产产品的时候,如果没有人消费,则生产线程阻塞,等待消费线程调用take操作,获取到产品并且会把生产线程唤醒。
SynchronousQueue也支持公平模式和非公平模式。

总结

今天我看解析了下三个常用的阻塞队列的特点,我们在日常使用中,new ThreadPoolExecutor必须指定队列,可以根据此三个队列的特点选择。
LinkedBlockingQueue和ArrayBlockingQueue不同点是:
在数据结构上,LinkedBlockingQueue使用了链表,而ArrayBlockingQueue使用了数组。
在锁机制上,LinkedBlockingQueue的锁是分离的,生产者有自己的锁,消费者也有自己的锁;而ArrayBlockingQueue生产者和消费者使用了通一把锁。
在构造函数上,LinkedBlockingQueue默认大小是Integer.MAX_VALUE,自己也能指定大小;而ArrayBlockingQueue因为容器是数组,所以必须指定大小。
SynchronousQueue是一个比较特殊的队列,如果没有消费者消费,生产者会阻塞;当消费者消费的时候,生产者才会生产。

0

评论区