Mercurial Hosting > luan
diff src/org/eclipse/jetty/util/BlockingArrayQueue.java @ 802:3428c60d7cfc
replace jetty jars with source
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Wed, 07 Sep 2016 21:15:48 -0600 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/org/eclipse/jetty/util/BlockingArrayQueue.java Wed Sep 07 21:15:48 2016 -0600 @@ -0,0 +1,704 @@ +// +// ======================================================================== +// Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.util; + +import java.util.AbstractList; +import java.util.Collection; +import java.util.NoSuchElementException; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + + +/* ------------------------------------------------------------ */ +/** Queue backed by a circular array. + * + * This queue is uses a variant of the two lock queue algorithm to + * provide an efficient queue or list backed by a growable circular + * array. This queue also has a partial implementation of + * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and + * {@link #poll(long, TimeUnit)} methods. + * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is + * able to grow and provides a blocking put call. + * <p> + * The queue has both a capacity (the size of the array currently allocated) + * and a limit (the maximum size that may be allocated), which defaults to + * {@link Integer#MAX_VALUE}. + * + * @param <E> The element type + */ +public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E> +{ + public final int DEFAULT_CAPACITY=128; + public final int DEFAULT_GROWTH=64; + private final int _limit; + private final AtomicInteger _size=new AtomicInteger(); + private final int _growCapacity; + + private volatile int _capacity; + private Object[] _elements; + + private final ReentrantLock _headLock = new ReentrantLock(); + private final Condition _notEmpty = _headLock.newCondition(); + private int _head; + + // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing + // TODO verify this has benefits + private long _space0; + private long _space1; + private long _space2; + private long _space3; + private long _space4; + private long _space5; + private long _space6; + private long _space7; + + private final ReentrantLock _tailLock = new ReentrantLock(); + private int _tail; + + + /* ------------------------------------------------------------ */ + /** Create a growing partially blocking Queue + * + */ + public BlockingArrayQueue() + { + _elements=new Object[DEFAULT_CAPACITY]; + _growCapacity=DEFAULT_GROWTH; + _capacity=_elements.length; + _limit=Integer.MAX_VALUE; + } + + /* ------------------------------------------------------------ */ + /** Create a fixed size partially blocking Queue + * @param limit The initial capacity and the limit. + */ + public BlockingArrayQueue(int limit) + { + _elements=new Object[limit]; + _capacity=_elements.length; + _growCapacity=-1; + _limit=limit; + } + + /* ------------------------------------------------------------ */ + /** Create a growing partially blocking Queue. + * @param capacity Initial capacity + * @param growBy Incremental capacity. + */ + public BlockingArrayQueue(int capacity,int growBy) + { + _elements=new Object[capacity]; + _capacity=_elements.length; + _growCapacity=growBy; + _limit=Integer.MAX_VALUE; + } + + /* ------------------------------------------------------------ */ + /** Create a growing limited partially blocking Queue. + * @param capacity Initial capacity + * @param growBy Incremental capacity. + * @param limit maximum capacity. + */ + public BlockingArrayQueue(int capacity,int growBy,int limit) + { + if (capacity>limit) + throw new IllegalArgumentException(); + + _elements=new Object[capacity]; + _capacity=_elements.length; + _growCapacity=growBy; + _limit=limit; + } + + /* ------------------------------------------------------------ */ + public int getCapacity() + { + return _capacity; + } + + /* ------------------------------------------------------------ */ + public int getLimit() + { + return _limit; + } + + /* ------------------------------------------------------------ */ + @Override + public boolean add(E e) + { + return offer(e); + } + + /* ------------------------------------------------------------ */ + public E element() + { + E e = peek(); + if (e==null) + throw new NoSuchElementException(); + return e; + } + + /* ------------------------------------------------------------ */ + @SuppressWarnings("unchecked") + public E peek() + { + if (_size.get() == 0) + return null; + + E e = null; + _headLock.lock(); // Size cannot shrink + try + { + if (_size.get() > 0) + e = (E)_elements[_head]; + } + finally + { + _headLock.unlock(); + } + + return e; + } + + /* ------------------------------------------------------------ */ + public boolean offer(E e) + { + if (e == null) + throw new NullPointerException(); + + boolean not_empty=false; + _tailLock.lock(); // size cannot grow... only shrink + try + { + if (_size.get() >= _limit) + return false; + + // should we expand array? + if (_size.get()==_capacity) + { + _headLock.lock(); // Need to grow array + try + { + if (!grow()) + return false; + } + finally + { + _headLock.unlock(); + } + } + + // add the element + _elements[_tail]=e; + _tail=(_tail+1)%_capacity; + + not_empty=0==_size.getAndIncrement(); + + } + finally + { + _tailLock.unlock(); + } + + if (not_empty) + { + _headLock.lock(); + try + { + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + } + + return true; + } + + + /* ------------------------------------------------------------ */ + @SuppressWarnings("unchecked") + public E poll() + { + if (_size.get() == 0) + return null; + + E e = null; + _headLock.lock(); // Size cannot shrink + try + { + if (_size.get() > 0) + { + final int head=_head; + e = (E)_elements[head]; + _elements[head]=null; + _head=(head+1)%_capacity; + + if (_size.decrementAndGet()>0) + _notEmpty.signal(); + } + } + finally + { + _headLock.unlock(); + } + + return e; + } + + /* ------------------------------------------------------------ */ + /** + * Retrieves and removes the head of this queue, waiting + * if no elements are present on this queue. + * @return the head of this queue + * @throws InterruptedException if interrupted while waiting. + */ + @SuppressWarnings("unchecked") + public E take() throws InterruptedException + { + E e = null; + _headLock.lockInterruptibly(); // Size cannot shrink + try + { + try + { + while (_size.get() == 0) + { + _notEmpty.await(); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + final int head=_head; + e = (E)_elements[head]; + _elements[head]=null; + _head=(head+1)%_capacity; + + if (_size.decrementAndGet()>0) + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + + return e; + } + + /* ------------------------------------------------------------ */ + /** + * Retrieves and removes the head of this queue, waiting + * if necessary up to the specified wait time if no elements are + * present on this queue. + * @param time how long to wait before giving up, in units of + * <tt>unit</tt> + * @param unit a <tt>TimeUnit</tt> determining how to interpret the + * <tt>timeout</tt> parameter + * @return the head of this queue, or <tt>null</tt> if the + * specified waiting time elapses before an element is present. + * @throws InterruptedException if interrupted while waiting. + */ + @SuppressWarnings("unchecked") + public E poll(long time, TimeUnit unit) throws InterruptedException + { + + E e = null; + + long nanos = unit.toNanos(time); + + _headLock.lockInterruptibly(); // Size cannot shrink + try + { + try + { + while (_size.get() == 0) + { + if (nanos<=0) + return null; + nanos = _notEmpty.awaitNanos(nanos); + } + } + catch (InterruptedException ie) + { + _notEmpty.signal(); + throw ie; + } + + e = (E)_elements[_head]; + _elements[_head]=null; + _head=(_head+1)%_capacity; + + if (_size.decrementAndGet()>0) + _notEmpty.signal(); + } + finally + { + _headLock.unlock(); + } + + return e; + } + + /* ------------------------------------------------------------ */ + public E remove() + { + E e=poll(); + if (e==null) + throw new NoSuchElementException(); + return e; + } + + /* ------------------------------------------------------------ */ + @Override + public void clear() + { + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + _head=0; + _tail=0; + _size.set(0); + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public boolean isEmpty() + { + return _size.get()==0; + } + + /* ------------------------------------------------------------ */ + @Override + public int size() + { + return _size.get(); + } + + /* ------------------------------------------------------------ */ + @SuppressWarnings("unchecked") + @Override + public E get(int index) + { + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + if (index<0 || index>=_size.get()) + throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + int i = _head+index; + if (i>=_capacity) + i-=_capacity; + return (E)_elements[i]; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public E remove(int index) + { + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + + if (index<0 || index>=_size.get()) + throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + + int i = _head+index; + if (i>=_capacity) + i-=_capacity; + @SuppressWarnings("unchecked") + E old=(E)_elements[i]; + + if (i<_tail) + { + System.arraycopy(_elements,i+1,_elements,i,_tail-i); + _tail--; + _size.decrementAndGet(); + } + else + { + System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1); + if (_tail>0) + { + _elements[_capacity]=_elements[0]; + System.arraycopy(_elements,1,_elements,0,_tail-1); + _tail--; + } + else + _tail=_capacity-1; + + _size.decrementAndGet(); + } + + return old; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public E set(int index, E e) + { + if (e == null) + throw new NullPointerException(); + + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + + if (index<0 || index>=_size.get()) + throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + + int i = _head+index; + if (i>=_capacity) + i-=_capacity; + @SuppressWarnings("unchecked") + E old=(E)_elements[i]; + _elements[i]=e; + return old; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + @Override + public void add(int index, E e) + { + if (e == null) + throw new NullPointerException(); + + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + + if (index<0 || index>_size.get()) + throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + + if (index==_size.get()) + { + add(e); + } + else + { + if (_tail==_head) + if (!grow()) + throw new IllegalStateException("full"); + + int i = _head+index; + if (i>=_capacity) + i-=_capacity; + + _size.incrementAndGet(); + _tail=(_tail+1)%_capacity; + + + if (i<_tail) + { + System.arraycopy(_elements,i,_elements,i+1,_tail-i); + _elements[i]=e; + } + else + { + if (_tail>0) + { + System.arraycopy(_elements,0,_elements,1,_tail); + _elements[0]=_elements[_capacity-1]; + } + + System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1); + _elements[i]=e; + } + } + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + /* ------------------------------------------------------------ */ + private boolean grow() + { + if (_growCapacity<=0) + return false; + + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + final int head=_head; + final int tail=_tail; + final int new_tail; + + Object[] elements=new Object[_capacity+_growCapacity]; + + if (head<tail) + { + new_tail=tail-head; + System.arraycopy(_elements,head,elements,0,new_tail); + } + else if (head>tail || _size.get()>0) + { + new_tail=_capacity+tail-head; + int cut=_capacity-head; + System.arraycopy(_elements,head,elements,0,cut); + System.arraycopy(_elements,0,elements,cut,tail); + } + else + { + new_tail=0; + } + + _elements=elements; + _capacity=_elements.length; + _head=0; + _tail=new_tail; + return true; + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + + } + + /* ------------------------------------------------------------ */ + public int drainTo(Collection<? super E> c) + { + throw new UnsupportedOperationException(); + } + + /* ------------------------------------------------------------ */ + public int drainTo(Collection<? super E> c, int maxElements) + { + throw new UnsupportedOperationException(); + } + + /* ------------------------------------------------------------ */ + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException + { + throw new UnsupportedOperationException(); + } + + /* ------------------------------------------------------------ */ + public void put(E o) throws InterruptedException + { + if (!add(o)) + throw new IllegalStateException("full"); + } + + /* ------------------------------------------------------------ */ + public int remainingCapacity() + { + _tailLock.lock(); + try + { + _headLock.lock(); + try + { + return getCapacity()-size(); + } + finally + { + _headLock.unlock(); + } + } + finally + { + _tailLock.unlock(); + } + } + + + /* ------------------------------------------------------------ */ + long sumOfSpace() + { + // this method exists to stop clever optimisers removing the spacers + return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; + } +}