view src/org/eclipse/jetty/util/BlockingArrayQueue.java @ 860:626815a4b19b

minor fix
author Franklin Schmidt <fschmidt@gmail.com>
date Tue, 27 Sep 2016 22:20:07 -0600
parents 3428c60d7cfc
children
line wrap: on
line source

//
//  ========================================================================
//  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util;

import java.util.AbstractList;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;


/* ------------------------------------------------------------ */
/** Queue backed by a circular array.
 * 
 * This queue is uses  a variant of the two lock queue algorithm to
 * provide an efficient queue or list backed by a growable circular
 * array.  This queue also has a partial implementation of 
 * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and 
 * {@link #poll(long, TimeUnit)} methods.  
 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
 * able to grow and provides a blocking put call.
 * <p>
 * The queue has both a capacity (the size of the array currently allocated)
 * and a limit (the maximum size that may be allocated), which defaults to 
 * {@link Integer#MAX_VALUE}.
 * 
 * @param <E> The element type
 */
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
    public final int DEFAULT_CAPACITY=128;
    public final int DEFAULT_GROWTH=64;
    private final int _limit;
    private final AtomicInteger _size=new AtomicInteger();
    private final int _growCapacity;
    
    private volatile int _capacity;
    private Object[] _elements;
    
    private final ReentrantLock _headLock = new ReentrantLock();
    private final Condition _notEmpty = _headLock.newCondition();
    private int _head;

    // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
    // TODO verify this has benefits
    private long _space0;
    private long _space1;
    private long _space2;
    private long _space3;
    private long _space4;
    private long _space5;
    private long _space6;
    private long _space7;
    
    private final ReentrantLock _tailLock = new ReentrantLock();
    private int _tail;
    

    /* ------------------------------------------------------------ */
    /** Create a growing partially blocking Queue
     * 
     */
    public BlockingArrayQueue()
    {
        _elements=new Object[DEFAULT_CAPACITY];
        _growCapacity=DEFAULT_GROWTH;
        _capacity=_elements.length;
        _limit=Integer.MAX_VALUE;
    }

    /* ------------------------------------------------------------ */
    /** Create a fixed size partially blocking Queue
     * @param limit The initial capacity and the limit.
     */
    public BlockingArrayQueue(int limit)
    {
        _elements=new Object[limit];
        _capacity=_elements.length;
        _growCapacity=-1;
        _limit=limit;
    }

    /* ------------------------------------------------------------ */
    /** Create a growing partially blocking Queue.
     * @param capacity Initial capacity
     * @param growBy Incremental capacity.
     */
    public BlockingArrayQueue(int capacity,int growBy)
    {
        _elements=new Object[capacity];
        _capacity=_elements.length;
        _growCapacity=growBy;
        _limit=Integer.MAX_VALUE;
    }

    /* ------------------------------------------------------------ */
    /** Create a growing limited partially blocking Queue.
     * @param capacity Initial capacity
     * @param growBy Incremental capacity.
     * @param limit maximum capacity.
     */
    public BlockingArrayQueue(int capacity,int growBy,int limit)
    {
        if (capacity>limit)
            throw new IllegalArgumentException();
        
        _elements=new Object[capacity];
        _capacity=_elements.length;
        _growCapacity=growBy;
        _limit=limit;
    }

    /* ------------------------------------------------------------ */
    public int getCapacity()
    {
        return _capacity;
    }

    /* ------------------------------------------------------------ */
    public int getLimit()
    {
        return _limit;
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public boolean add(E e)
    {
        return offer(e);
    }
    
    /* ------------------------------------------------------------ */
    public E element()
    {
        E e = peek();
        if (e==null)
            throw new NoSuchElementException();
        return e;
    }
    
    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    public E peek()
    {
        if (_size.get() == 0)
            return null;
        
        E e = null;
        _headLock.lock(); // Size cannot shrink
        try 
        {
            if (_size.get() > 0) 
                e = (E)_elements[_head];
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    public boolean offer(E e)
    {
        if (e == null) 
            throw new NullPointerException();
        
        boolean not_empty=false;
        _tailLock.lock();  // size cannot grow... only shrink
        try 
        {
            if (_size.get() >= _limit) 
                return false;
            
            // should we expand array?
            if (_size.get()==_capacity)
            {
                _headLock.lock();   // Need to grow array
                try
                {
                    if (!grow())
                        return false;
                }
                finally
                {
                    _headLock.unlock();
                }
            }

            // add the element
            _elements[_tail]=e;
            _tail=(_tail+1)%_capacity;

            not_empty=0==_size.getAndIncrement();
            
        } 
        finally 
        {
            _tailLock.unlock();
        }
        
        if (not_empty)
        {
            _headLock.lock();
            try
            {
                _notEmpty.signal();
            }
            finally
            {
                _headLock.unlock();
            }
        }  

        return true;
    }


    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    public E poll()
    {
        if (_size.get() == 0)
            return null;
        
        E e = null;
        _headLock.lock(); // Size cannot shrink
        try 
        {
            if (_size.get() > 0) 
            {
                final int head=_head;
                e = (E)_elements[head];
                _elements[head]=null;
                _head=(head+1)%_capacity;
                
                if (_size.decrementAndGet()>0)
                    _notEmpty.signal();
            }
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    /**
     * Retrieves and removes the head of this queue, waiting
     * if no elements are present on this queue.
     * @return the head of this queue
     * @throws InterruptedException if interrupted while waiting.
     */
    @SuppressWarnings("unchecked")
    public E take() throws InterruptedException
    {
        E e = null;
        _headLock.lockInterruptibly();  // Size cannot shrink
        try 
        {
            try 
            {
                while (_size.get() == 0)
                {
                    _notEmpty.await();
                }
            } 
            catch (InterruptedException ie) 
            {
                _notEmpty.signal();
                throw ie;
            }

            final int head=_head;
            e = (E)_elements[head];
            _elements[head]=null;
            _head=(head+1)%_capacity;

            if (_size.decrementAndGet()>0)
                _notEmpty.signal();
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    /**
     * Retrieves and removes the head of this queue, waiting
     * if necessary up to the specified wait time if no elements are
     * present on this queue.
     * @param time how long to wait before giving up, in units of
     * <tt>unit</tt>
     * @param unit a <tt>TimeUnit</tt> determining how to interpret the
     * <tt>timeout</tt> parameter
     * @return the head of this queue, or <tt>null</tt> if the
     * specified waiting time elapses before an element is present.
     * @throws InterruptedException if interrupted while waiting.
     */
    @SuppressWarnings("unchecked")
    public E poll(long time, TimeUnit unit) throws InterruptedException
    {
        
        E e = null;

        long nanos = unit.toNanos(time);
        
        _headLock.lockInterruptibly(); // Size cannot shrink
        try 
        {    
            try 
            {
                while (_size.get() == 0)
                {
                    if (nanos<=0)
                        return null;
                    nanos = _notEmpty.awaitNanos(nanos);
                }
            } 
            catch (InterruptedException ie) 
            {
                _notEmpty.signal();
                throw ie;
            }

            e = (E)_elements[_head];
            _elements[_head]=null;
            _head=(_head+1)%_capacity;

            if (_size.decrementAndGet()>0)
                _notEmpty.signal();
        } 
        finally 
        {
            _headLock.unlock();
        }
        
        return e;
    }

    /* ------------------------------------------------------------ */
    public E remove()
    {
        E e=poll();
        if (e==null)
            throw new NoSuchElementException();
        return e;
    }

    /* ------------------------------------------------------------ */
    @Override
    public void clear()
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                _head=0;
                _tail=0;
                _size.set(0);
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    @Override
    public boolean isEmpty()
    {
        return _size.get()==0;
    }

    /* ------------------------------------------------------------ */
    @Override
    public int size()
    {
        return _size.get();
    }

    /* ------------------------------------------------------------ */
    @SuppressWarnings("unchecked")
    @Override
    public E get(int index)
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                return (E)_elements[i];
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public E remove(int index)
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                @SuppressWarnings("unchecked")
                E old=(E)_elements[i];

                if (i<_tail)
                {
                    System.arraycopy(_elements,i+1,_elements,i,_tail-i);
                    _tail--;
                    _size.decrementAndGet();
                }
                else
                {
                    System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
                    if (_tail>0)
                    {
                        _elements[_capacity]=_elements[0];
                        System.arraycopy(_elements,1,_elements,0,_tail-1);
                        _tail--;
                    }
                    else
                        _tail=_capacity-1;

                    _size.decrementAndGet();
                }

                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    @Override
    public E set(int index, E e)
    {
        if (e == null) 
            throw new NullPointerException();

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>=_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                int i = _head+index;
                if (i>=_capacity)
                    i-=_capacity;
                @SuppressWarnings("unchecked")
                E old=(E)_elements[i];
                _elements[i]=e;
                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    
    /* ------------------------------------------------------------ */
    @Override
    public void add(int index, E e)
    {
        if (e == null) 
            throw new NullPointerException();

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {

                if (index<0 || index>_size.get())
                    throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");

                if (index==_size.get())
                {
                    add(e);
                }
                else
                {
                    if (_tail==_head)
                        if (!grow())
                            throw new IllegalStateException("full");

                    int i = _head+index;
                    if (i>=_capacity)
                        i-=_capacity;

                    _size.incrementAndGet();
                    _tail=(_tail+1)%_capacity;


                    if (i<_tail)
                    {
                        System.arraycopy(_elements,i,_elements,i+1,_tail-i);
                        _elements[i]=e;
                    }
                    else
                    {
                        if (_tail>0)
                        {
                            System.arraycopy(_elements,0,_elements,1,_tail);
                            _elements[0]=_elements[_capacity-1];
                        }

                        System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
                        _elements[i]=e;
                    }
                }
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /* ------------------------------------------------------------ */
    private boolean grow()
    {
        if (_growCapacity<=0)
            return false;

        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                final int head=_head;
                final int tail=_tail;
                final int new_tail;

                Object[] elements=new Object[_capacity+_growCapacity];

                if (head<tail)
                {
                    new_tail=tail-head;
                    System.arraycopy(_elements,head,elements,0,new_tail);
                }
                else if (head>tail || _size.get()>0)
                {
                    new_tail=_capacity+tail-head;
                    int cut=_capacity-head;
                    System.arraycopy(_elements,head,elements,0,cut);
                    System.arraycopy(_elements,0,elements,cut,tail);
                }
                else
                {
                    new_tail=0;
                }

                _elements=elements;
                _capacity=_elements.length;
                _head=0;
                _tail=new_tail; 
                return true;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }

    }

    /* ------------------------------------------------------------ */
    public int drainTo(Collection<? super E> c)
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public int drainTo(Collection<? super E> c, int maxElements)
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
    {
        throw new UnsupportedOperationException();
    }

    /* ------------------------------------------------------------ */
    public void put(E o) throws InterruptedException
    {
        if (!add(o))
            throw new IllegalStateException("full");
    }

    /* ------------------------------------------------------------ */
    public int remainingCapacity()
    {
        _tailLock.lock();
        try
        {
            _headLock.lock();
            try
            {
                return getCapacity()-size();
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }
    

    /* ------------------------------------------------------------ */
    long sumOfSpace()
    {
        // this method exists to stop clever optimisers removing the spacers
        return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; 
    }
}