队列源码分析

image-20250210162453218

1. 双端队列ArrayDeque

ArrayDeque 是基于数组实现的可动态扩容的双端队列。

1.1 初始化

和HashMap类似的方式, 容量一定是2的幂次方

private static int calculateSize(int numElements) {
    int initialCapacity = MIN_INITIAL_CAPACITY;
    // Find the best power of two to hold elements.
    // Tests "<=" because arrays aren't kept full.
    if (numElements >= initialCapacity) {
        initialCapacity = numElements;
        initialCapacity |= (initialCapacity >>>  1);
        initialCapacity |= (initialCapacity >>>  2);
        initialCapacity |= (initialCapacity >>>  4);
        initialCapacity |= (initialCapacity >>>  8);
        initialCapacity |= (initialCapacity >>> 16);
        initialCapacity++;
        if (initialCapacity < 0)   // Too many elements, must back off
            initialCapacity >>>= 1;// Good luck allocating 2 ^ 30 element
    }
    return initialCapacity;
}

1.2 数据插入

deque.push等于deque.offerFirst,两个源码如下

public void addFirst(E e) {
    if (e == null)
        throw new NullPointerException();
    elements[head = (head - 1) & (elements.length - 1)] = e;
    if (head == tail)
        doubleCapacity();
}
public void addLast(E e) {
    if (e == null)
        throw new NullPointerException();
    elements[tail] = e;
    if ( (tail = (tail + 1) & (elements.length - 1)) == head)
        doubleCapacity();
}

  1. 判断插入数据

  2. 插入

  3. 判断是否需要扩容

  • head 是一个索引,表示队列的首元素(头部元素)所在的底层数组中的位置。
  • tail 是一个索引,表示队列的末尾位置 之后的一个空槽

初始时 tail和head都为0,环形数组

1.3 数组扩容和数组迁移

及其是数组实现就避免不了扩容和数据迁移到新的位置

private void doubleCapacity() {
    assert head == tail;
    int p = head;
    int n = elements.length;
    int r = n - p; // number of elements to the right of p
    int newCapacity = n << 1;
    if (newCapacity < 0)
        throw new IllegalStateException("Sorry, deque too big");
    Object[] a = new Object[newCapacity];
    System.arraycopy(elements, p, a, 0, r);
    System.arraycopy(elements, 0, a, r, p);
    elements = a;
    head = 0;
    tail = n;
}


/*
	src:源数组(需要复制的数组)。
	srcPos:源数组的起始位置(从哪里开始复制,索引从 0 开始)。
	dest:目标数组(复制的目标数组)。
	destPos:目标数组的起始位置(目标数组的起始索引位置,将源数组元素复制到目标数组的哪个位置)。 
	length:要复制的元素个数
*/
public static void arraycopy(Object src, int srcPos, Object dest, int destPos, int length)
    
  

2. 双端队列LinkedList

链表实现,从头尾取数据时间复杂度O(1), 同时数据的插入和删除也不需要像数组队列那样拷贝数据。

2.1 头插入

deque.push("a") 、 deque.offerFirst("a")

private void linkFirst(E e) {
    final Node<E> f = first;
    final Node<E> newNode = new Node<>(null, e, f);
    first = newNode;
    if (f == null)
        last = newNode;
    else
        f.prev = newNode;
    size++;
    modCount++;
}

2.2 尾插入

deque.offerLast("e");

void linkLast(E e) {
    final Node<E> l = last;
    final Node<E> newNode = new Node<>(l, e, null);
    last = newNode;
    if (l == null)
        first = newNode;
    else
        l.next = newNode;
    size++;
    modCount++;
}
 

3 优先队列PriorityQueue

优先队列本质是一个数组构成的二叉堆

二叉堆

  • 假如父节点为queue[n],那么左子节点为queue[2n+1],右子节点为queue[2n+2]
  • 任意孩子节点的父节点位置,都是 (n-1)>>>1 相当于减1后除2取整
  • 父节点小于等于任意孩子节点
  • 同一层级的两个孩子节点大小不需要维护,它是在弹出元素的时候进行判断的
  • 一个长度为 size 的优先级队列,当 index >= size >>> 1 时,该节点为叶子节点。否则,为非叶子节点。

3.1 扩容机制

private void grow(int minCapacity) {
    int oldCapacity = queue.length;
    // Double size if small; else grow by 50%
    int newCapacity = oldCapacity + ((oldCapacity < 64) ?
                                     (oldCapacity + 2) :
                                     (oldCapacity >> 1));
    // overflow-conscious code
    if (newCapacity - MAX_ARRAY_SIZE > 0)
        newCapacity = hugeCapacity(minCapacity);
    queue = Arrays.copyOf(queue, newCapacity);
}
  • 如果数组长度小于64:每次增加2
  • 如果数组长度大于64:每次翻倍
  • 如果超过整数上限则设置为上限

3.2 元素入队

public boolean offer(E e) {
    	if (e == null)
            throw new NullPointerException();
        modCount++;
        int i = size;
        if (i >= queue.length)
            grow(i + 1);
        siftUp(i, e);
        size = i + 1;
        return true;
}
// siftUp会使用Cimparator进行元素比较,如果没有指定具体Comparator,就采用元素对应的Comparator
private void siftUpUsingComparator(int k, E x) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = queue[parent];
        if (comparator.compare(x, (E) e) >= 0)
            break;
        queue[k] = e;
        k = parent;
    }
    queue[k] = x;
}

3.3 元素出队

public E poll() {
        final Object[] es;
        final E result;

        if ((result = (E) ((es = queue)[0])) != null) {
            modCount++;
            final int n;
            final E x = (E) es[(n = --size)];
            es[n] = null;
            if (n > 0) {
                final Comparator<? super E> cmp;
                if ((cmp = comparator) == null)
                    siftDownComparable(0, x, es, n);
                else
                    siftDownUsingComparator(0, x, es, n, cmp);
            }
        }
        return result;
    }

private static <T> void siftDownUsingComparator(
        int k, T x, Object[] es, int n, Comparator<? super T> cmp) {
        // assert n > 0;
        int half = n >>> 1;
        while (k < half) {
            int child = (k << 1) + 1;
            Object c = es[child];
            int right = child + 1;
            if (right < n && cmp.compare((T) c, (T) es[right]) > 0)
                c = es[child = right];
            if (cmp.compare(x, (T) c) <= 0)
                break;
            es[k] = c;
            k = child;
        }
        es[k] = x;
    }

将数组元素从根节点,放到叶子节点,然后删掉。

4. 延时队列DelayQueue

DelayQueue 是阻塞队列实现的,扩充了PriorityQueue和ReentrantLock锁。

4.1 元素入队

delayQueue.offer(new TestDelayed("aaa", 5, TimeUnit.SECONDS));

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        q.offer(e);
        if (q.peek() == e) {
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

4.2 元素出队

最小化的空等时间,提高线程利用率。有锁

 public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            return (first == null || first.getDelay(NANOSECONDS) > 0)
                ? null
                : q.poll();
        } finally {
            lock.unlock();
        }
    }

4.3 元素消费

private final Condition available = lock.newCondition();
public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
  • DelayQueue 使用 leader/follower 模式来优化线程唤醒。 当一个线程发现队列头部的元素还未到期时,它就成为 leader,负责等待剩余的时间。 其他线程到达时,如果发现已经有 leader,就直接进入等待状态,而不用竞争 leader 的身份。 这样可以减少不必要的唤醒和上下文切换,提高性能。

  • available: available是Condition,比wait()/notify() 机制更细粒度的线程控制, Condition 允许你创建多个等待集合

常见面试问题

单端队列和双端队列,分别对应的实现类是哪个?

  • 单端队列 (Queue):

    LinkedList: LinkedList 实现了 Queue 接口,可以用作单端队列。 它基于链表,提供了先进先出 (FIFO) 的特性。

    PriorityQueue: 优先级队列,但也可以理解为一种特殊的单端队列,只不过出队顺序是按照元素的优先级来的。

  • 双端队列 (Deque):

    LinkedList: LinkedList 也实现了 Deque 接口,因此它也可以作为双端队列使用。 这意味着你可以在队列的两端进行插入和删除操作。

    ArrayDeque: 基于动态数组实现的双端队列,性能比 LinkedList 好,特别是在随机访问方面。

简述延迟队列/优先队列的实现方式

  • 延迟队列: 实现了优先队列,并进行扩充

    • 基于优先队列 + 时间戳:

      最常见的实现方式。

      • 使用 PriorityQueue 来存储元素。
      • 每个元素都关联一个 "到期时间" (时间戳)。
      • PriorityQueue 按照到期时间排序,到期时间最早的元素排在队首。
      • 从队列中取出元素前,检查队首元素的到期时间是否已过。如果未过,则阻塞(例如,通过 Thread.sleep()LockSupport.parkNanos())等待一段时间再检查。 可以使用一个专门的线程来做这个检查和等待。
    • 基于时间轮: 适合高并发场景。 将时间划分成多个槽位,每个槽位对应一个时间段。 将延迟任务放入对应的槽位中,然后一个时间轮指针按固定间隔移动,当指针指向一个槽位时,就处理该槽位中的任务。

  • 优先队列:

    • 二叉堆: 最常见的实现方式。 Java 中的 PriorityQueue 默认就是使用二叉堆实现的。

二叉堆插入/弹出元素的过程

  • 插入元素 (Insert):

    1. 添加到堆的末尾: 将新元素添加到堆的最后一个位置(数组的最后一个元素)。

    2. 上浮 (Heapify Up / Bubble Up):

      将新元素与其父节点进行比较。

      • 如果新元素比父节点更“优先”(对于最小堆,这意味着新元素更小;对于最大堆,这意味着新元素更大),则交换它们的位置。
      • 重复这个比较和交换的过程,直到新元素到达根节点,或者它的优先级不再高于其父节点。
  • 弹出元素 (Extract Min/Max):

    1. 取出根节点: 将根节点(最小堆中的最小值或最大堆中的最大值)取出。

    2. 用最后一个元素替换根节点: 将堆的最后一个元素移动到根节点的位置。

    3. 下沉 (Heapify Down / Sink Down):将新的根节点与其子节点进行比较。

      • 如果新的根节点比它的一个或多个子节点优先级更低(对于最小堆,这意味着新的根节点更大;对于最大堆,这意味着新的根节点更小),则选择优先级最高的子节点(对于最小堆,选择最小的子节点;对于最大堆,选择最大的子节点),并交换它们的位置。

      • 重复这个比较和交换的过程,直到新的根节点到达叶子节点,或者它的优先级不再低于其任何子节点。

延迟队列的使用场景?

  • 取消订单: 用户下单后,一段时间内未支付,自动取消订单。
  • 缓存过期: 缓存项在一段时间后自动过期。
  • 定时任务: 在指定时间执行某个任务(虽然通常有更专业的定时任务调度框架,但延迟队列可以作为一种轻量级的替代方案)。
  • 重试机制: 在某个操作失败后,延迟一段时间后重试。
  • 会话管理: 在用户长时间不活动后,使会话过期。
  • 通知推送: 在特定时间点向用户发送通知。

延迟队列为什么添加信号量?(JAVA使用的是Condition)

在某些延迟队列的实现中,特别是那些需要阻塞等待元素到期的实现中,信号量(Semaphore)可以用来优化性能和避免不必要的 CPU 消耗。 以下是使用信号量的原因:

  • 避免忙等待 (Busy-Waiting): 如果使用简单的 while 循环来检查队首元素是否到期,会导致 CPU 持续运行,即使队列中没有到期的元素。 这被称为忙等待,效率很低。
  • 线程唤醒: 当有新的元素添加到延迟队列中,并且该元素的到期时间比当前队首元素的到期时间更早时,需要唤醒正在等待的线程,以便它能够重新检查队首元素

信号量的工作方式:

  1. 初始化信号量: 信号量初始化为 0。
  2. 等待: 当线程从延迟队列中取元素时,如果队首元素未到期,线程会调用 semaphore.acquire() 方法来阻塞自己。 acquire() 方法会阻塞线程,直到信号量的值大于 0。
  3. 添加元素和释放信号量: 当向延迟队列中添加新元素时,如果新元素的到期时间比当前队首元素的到期时间更早,会调用 semaphore.release() 方法来释放信号量。 release() 方法会将信号量的值增加 1,从而唤醒一个正在等待的线程。