/** * Linked list node class * 队列结点类型Node */ staticclassNode<E> { E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next;
Node(E x) { item = x; } }
/** The capacity bound, or Integer.MAX_VALUE if none */ //队列的的大小,初始化的时候设置,如果没有传入默认为Integer.MAX_VALUE privatefinalint capacity;
/** Current number of elements */ //队列中元素的个数,由于多线程访问修改,使用了线程安全的AtomicInteger类。 privatefinal AtomicInteger count = new AtomicInteger();
/** * Head of linked list. * Invariant: head.item == null * 队列的头结点 */ transient Node<E> head;
/** * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); //初始化队列大小 this.capacity = capacity; //初始化链表 last = head = new Node<E>(null); }
/** * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ publicLinkedBlockingQueue(){ //默认为Integer.MAX_VALUE 可认为无限大 this(Integer.MAX_VALUE); }
/** * Inserts the specified element at the tail of this queue, waiting if * necessary for space to become available. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicvoidput(E e)throws InterruptedException { if (e == null) thrownew NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; //新建入队结点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取入队锁,这里调用的lockInterruptibly()方法, //而不是lock()方法,是为了更友好。lock()方法在没有成功获取到锁的的时 //候会一直block,打死不回头。而lockInterruptibly()方法在阻塞的时候 //如果被中断,线程会被唤醒并且处理中断异常,选择是继续阻塞还是返回了。 //( ReentrantLock对象中还有tryLock()方法,这个方法是,不会阻塞马上返 //回,拿到锁后就返回true否则返回false,比较潇洒。tryLock(time)方法是 //拿不到锁的时候,会阻塞time时间,超时就返回false,比较友好。) putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //如果容量满了 就一直阻塞 while (count.get() == capacity) { notFull.await(); } //入队操作(后面会讲到) enqueue(node); //队列的元素个数加一,并且返回队列元素个数(注意getAndIncrement返回的是旧值) c = count.getAndIncrement(); //如果链表没有满,则发出通知 ,唤醒一个等待入队的线程 if (c + 1 < capacity) notFull.signal(); } finally { //释放锁 putLock.unlock(); } //如果入队之前队列是空的,那么现在可以唤醒一个等待出队的线程 if (c == 0) signalNotEmpty(); }
/** * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ publicbooleanoffer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) thrownew NullPointerException(); //将时间转换成纳秒 long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //容量满了 然后判断有没有传入等待时间 //小于0的(即就是没有配置等待时间)不需要阻塞直接返回,失败 if (nanos <= 0) returnfalse; //等待一定时间,返回0或者小于0的一个值 nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); returntrue; }
入队的链表操作
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Links node at end of queue. * * @param node the node */ privatevoidenqueue(Node<E> node){ // assert putLock.isHeldByCurrentThread(); // assert last.next == null; //其实相当于两步 // last.next = node 将last的下个结点指向新结点 // last = last.next 将last重新指向新节点 last = last.next = node; }
//和offer方法几乎是一样的。注释就不写了。大家都能看懂 public E poll(long timeout, TimeUnit unit)throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) returnnull; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1 2 3 4 5 6 7 8 9 10 11 12
//具体的出队操作 //LinkedBlockingQueue队列中的head结点只是一个指向,具体的值是null的,队列中第一个元素是head->next //出队操作就是将head指向head->next,然后将head->next的值设置为null private E dequeue(){ Node<E> h = head; Node<E> first = h.next; h.next = h; // 帮助GC回收 head = first; E x = first.item; first.item = null; return x; }
我们看了LinkedBlockingQueue的出队和入队操作。我们现在来对比一下。
三种入队对比
offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false—>不阻塞
put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断—>阻塞
offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待,直到出现以下三种情况:—>阻塞