comparison src/org/eclipse/jetty/util/BlockingArrayQueue.java @ 802:3428c60d7cfc

replace jetty jars with source
author Franklin Schmidt <fschmidt@gmail.com>
date Wed, 07 Sep 2016 21:15:48 -0600
parents
children
comparison
equal deleted inserted replaced
801:6a21393191c1 802:3428c60d7cfc
1 //
2 // ========================================================================
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
4 // ------------------------------------------------------------------------
5 // All rights reserved. This program and the accompanying materials
6 // are made available under the terms of the Eclipse Public License v1.0
7 // and Apache License v2.0 which accompanies this distribution.
8 //
9 // The Eclipse Public License is available at
10 // http://www.eclipse.org/legal/epl-v10.html
11 //
12 // The Apache License v2.0 is available at
13 // http://www.opensource.org/licenses/apache2.0.php
14 //
15 // You may elect to redistribute this code under either of these licenses.
16 // ========================================================================
17 //
18
19 package org.eclipse.jetty.util;
20
21 import java.util.AbstractList;
22 import java.util.Collection;
23 import java.util.NoSuchElementException;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.TimeUnit;
26 import java.util.concurrent.atomic.AtomicInteger;
27 import java.util.concurrent.locks.Condition;
28 import java.util.concurrent.locks.ReentrantLock;
29
30
31 /* ------------------------------------------------------------ */
32 /** Queue backed by a circular array.
33 *
34 * This queue is uses a variant of the two lock queue algorithm to
35 * provide an efficient queue or list backed by a growable circular
36 * array. This queue also has a partial implementation of
37 * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and
38 * {@link #poll(long, TimeUnit)} methods.
39 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
40 * able to grow and provides a blocking put call.
41 * <p>
42 * The queue has both a capacity (the size of the array currently allocated)
43 * and a limit (the maximum size that may be allocated), which defaults to
44 * {@link Integer#MAX_VALUE}.
45 *
46 * @param <E> The element type
47 */
48 public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
49 {
50 public final int DEFAULT_CAPACITY=128;
51 public final int DEFAULT_GROWTH=64;
52 private final int _limit;
53 private final AtomicInteger _size=new AtomicInteger();
54 private final int _growCapacity;
55
56 private volatile int _capacity;
57 private Object[] _elements;
58
59 private final ReentrantLock _headLock = new ReentrantLock();
60 private final Condition _notEmpty = _headLock.newCondition();
61 private int _head;
62
63 // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
64 // TODO verify this has benefits
65 private long _space0;
66 private long _space1;
67 private long _space2;
68 private long _space3;
69 private long _space4;
70 private long _space5;
71 private long _space6;
72 private long _space7;
73
74 private final ReentrantLock _tailLock = new ReentrantLock();
75 private int _tail;
76
77
78 /* ------------------------------------------------------------ */
79 /** Create a growing partially blocking Queue
80 *
81 */
82 public BlockingArrayQueue()
83 {
84 _elements=new Object[DEFAULT_CAPACITY];
85 _growCapacity=DEFAULT_GROWTH;
86 _capacity=_elements.length;
87 _limit=Integer.MAX_VALUE;
88 }
89
90 /* ------------------------------------------------------------ */
91 /** Create a fixed size partially blocking Queue
92 * @param limit The initial capacity and the limit.
93 */
94 public BlockingArrayQueue(int limit)
95 {
96 _elements=new Object[limit];
97 _capacity=_elements.length;
98 _growCapacity=-1;
99 _limit=limit;
100 }
101
102 /* ------------------------------------------------------------ */
103 /** Create a growing partially blocking Queue.
104 * @param capacity Initial capacity
105 * @param growBy Incremental capacity.
106 */
107 public BlockingArrayQueue(int capacity,int growBy)
108 {
109 _elements=new Object[capacity];
110 _capacity=_elements.length;
111 _growCapacity=growBy;
112 _limit=Integer.MAX_VALUE;
113 }
114
115 /* ------------------------------------------------------------ */
116 /** Create a growing limited partially blocking Queue.
117 * @param capacity Initial capacity
118 * @param growBy Incremental capacity.
119 * @param limit maximum capacity.
120 */
121 public BlockingArrayQueue(int capacity,int growBy,int limit)
122 {
123 if (capacity>limit)
124 throw new IllegalArgumentException();
125
126 _elements=new Object[capacity];
127 _capacity=_elements.length;
128 _growCapacity=growBy;
129 _limit=limit;
130 }
131
132 /* ------------------------------------------------------------ */
133 public int getCapacity()
134 {
135 return _capacity;
136 }
137
138 /* ------------------------------------------------------------ */
139 public int getLimit()
140 {
141 return _limit;
142 }
143
144 /* ------------------------------------------------------------ */
145 @Override
146 public boolean add(E e)
147 {
148 return offer(e);
149 }
150
151 /* ------------------------------------------------------------ */
152 public E element()
153 {
154 E e = peek();
155 if (e==null)
156 throw new NoSuchElementException();
157 return e;
158 }
159
160 /* ------------------------------------------------------------ */
161 @SuppressWarnings("unchecked")
162 public E peek()
163 {
164 if (_size.get() == 0)
165 return null;
166
167 E e = null;
168 _headLock.lock(); // Size cannot shrink
169 try
170 {
171 if (_size.get() > 0)
172 e = (E)_elements[_head];
173 }
174 finally
175 {
176 _headLock.unlock();
177 }
178
179 return e;
180 }
181
182 /* ------------------------------------------------------------ */
183 public boolean offer(E e)
184 {
185 if (e == null)
186 throw new NullPointerException();
187
188 boolean not_empty=false;
189 _tailLock.lock(); // size cannot grow... only shrink
190 try
191 {
192 if (_size.get() >= _limit)
193 return false;
194
195 // should we expand array?
196 if (_size.get()==_capacity)
197 {
198 _headLock.lock(); // Need to grow array
199 try
200 {
201 if (!grow())
202 return false;
203 }
204 finally
205 {
206 _headLock.unlock();
207 }
208 }
209
210 // add the element
211 _elements[_tail]=e;
212 _tail=(_tail+1)%_capacity;
213
214 not_empty=0==_size.getAndIncrement();
215
216 }
217 finally
218 {
219 _tailLock.unlock();
220 }
221
222 if (not_empty)
223 {
224 _headLock.lock();
225 try
226 {
227 _notEmpty.signal();
228 }
229 finally
230 {
231 _headLock.unlock();
232 }
233 }
234
235 return true;
236 }
237
238
239 /* ------------------------------------------------------------ */
240 @SuppressWarnings("unchecked")
241 public E poll()
242 {
243 if (_size.get() == 0)
244 return null;
245
246 E e = null;
247 _headLock.lock(); // Size cannot shrink
248 try
249 {
250 if (_size.get() > 0)
251 {
252 final int head=_head;
253 e = (E)_elements[head];
254 _elements[head]=null;
255 _head=(head+1)%_capacity;
256
257 if (_size.decrementAndGet()>0)
258 _notEmpty.signal();
259 }
260 }
261 finally
262 {
263 _headLock.unlock();
264 }
265
266 return e;
267 }
268
269 /* ------------------------------------------------------------ */
270 /**
271 * Retrieves and removes the head of this queue, waiting
272 * if no elements are present on this queue.
273 * @return the head of this queue
274 * @throws InterruptedException if interrupted while waiting.
275 */
276 @SuppressWarnings("unchecked")
277 public E take() throws InterruptedException
278 {
279 E e = null;
280 _headLock.lockInterruptibly(); // Size cannot shrink
281 try
282 {
283 try
284 {
285 while (_size.get() == 0)
286 {
287 _notEmpty.await();
288 }
289 }
290 catch (InterruptedException ie)
291 {
292 _notEmpty.signal();
293 throw ie;
294 }
295
296 final int head=_head;
297 e = (E)_elements[head];
298 _elements[head]=null;
299 _head=(head+1)%_capacity;
300
301 if (_size.decrementAndGet()>0)
302 _notEmpty.signal();
303 }
304 finally
305 {
306 _headLock.unlock();
307 }
308
309 return e;
310 }
311
312 /* ------------------------------------------------------------ */
313 /**
314 * Retrieves and removes the head of this queue, waiting
315 * if necessary up to the specified wait time if no elements are
316 * present on this queue.
317 * @param time how long to wait before giving up, in units of
318 * <tt>unit</tt>
319 * @param unit a <tt>TimeUnit</tt> determining how to interpret the
320 * <tt>timeout</tt> parameter
321 * @return the head of this queue, or <tt>null</tt> if the
322 * specified waiting time elapses before an element is present.
323 * @throws InterruptedException if interrupted while waiting.
324 */
325 @SuppressWarnings("unchecked")
326 public E poll(long time, TimeUnit unit) throws InterruptedException
327 {
328
329 E e = null;
330
331 long nanos = unit.toNanos(time);
332
333 _headLock.lockInterruptibly(); // Size cannot shrink
334 try
335 {
336 try
337 {
338 while (_size.get() == 0)
339 {
340 if (nanos<=0)
341 return null;
342 nanos = _notEmpty.awaitNanos(nanos);
343 }
344 }
345 catch (InterruptedException ie)
346 {
347 _notEmpty.signal();
348 throw ie;
349 }
350
351 e = (E)_elements[_head];
352 _elements[_head]=null;
353 _head=(_head+1)%_capacity;
354
355 if (_size.decrementAndGet()>0)
356 _notEmpty.signal();
357 }
358 finally
359 {
360 _headLock.unlock();
361 }
362
363 return e;
364 }
365
366 /* ------------------------------------------------------------ */
367 public E remove()
368 {
369 E e=poll();
370 if (e==null)
371 throw new NoSuchElementException();
372 return e;
373 }
374
375 /* ------------------------------------------------------------ */
376 @Override
377 public void clear()
378 {
379 _tailLock.lock();
380 try
381 {
382 _headLock.lock();
383 try
384 {
385 _head=0;
386 _tail=0;
387 _size.set(0);
388 }
389 finally
390 {
391 _headLock.unlock();
392 }
393 }
394 finally
395 {
396 _tailLock.unlock();
397 }
398 }
399
400 /* ------------------------------------------------------------ */
401 @Override
402 public boolean isEmpty()
403 {
404 return _size.get()==0;
405 }
406
407 /* ------------------------------------------------------------ */
408 @Override
409 public int size()
410 {
411 return _size.get();
412 }
413
414 /* ------------------------------------------------------------ */
415 @SuppressWarnings("unchecked")
416 @Override
417 public E get(int index)
418 {
419 _tailLock.lock();
420 try
421 {
422 _headLock.lock();
423 try
424 {
425 if (index<0 || index>=_size.get())
426 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
427 int i = _head+index;
428 if (i>=_capacity)
429 i-=_capacity;
430 return (E)_elements[i];
431 }
432 finally
433 {
434 _headLock.unlock();
435 }
436 }
437 finally
438 {
439 _tailLock.unlock();
440 }
441 }
442
443 /* ------------------------------------------------------------ */
444 @Override
445 public E remove(int index)
446 {
447 _tailLock.lock();
448 try
449 {
450 _headLock.lock();
451 try
452 {
453
454 if (index<0 || index>=_size.get())
455 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
456
457 int i = _head+index;
458 if (i>=_capacity)
459 i-=_capacity;
460 @SuppressWarnings("unchecked")
461 E old=(E)_elements[i];
462
463 if (i<_tail)
464 {
465 System.arraycopy(_elements,i+1,_elements,i,_tail-i);
466 _tail--;
467 _size.decrementAndGet();
468 }
469 else
470 {
471 System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
472 if (_tail>0)
473 {
474 _elements[_capacity]=_elements[0];
475 System.arraycopy(_elements,1,_elements,0,_tail-1);
476 _tail--;
477 }
478 else
479 _tail=_capacity-1;
480
481 _size.decrementAndGet();
482 }
483
484 return old;
485 }
486 finally
487 {
488 _headLock.unlock();
489 }
490 }
491 finally
492 {
493 _tailLock.unlock();
494 }
495 }
496
497 /* ------------------------------------------------------------ */
498 @Override
499 public E set(int index, E e)
500 {
501 if (e == null)
502 throw new NullPointerException();
503
504 _tailLock.lock();
505 try
506 {
507 _headLock.lock();
508 try
509 {
510
511 if (index<0 || index>=_size.get())
512 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
513
514 int i = _head+index;
515 if (i>=_capacity)
516 i-=_capacity;
517 @SuppressWarnings("unchecked")
518 E old=(E)_elements[i];
519 _elements[i]=e;
520 return old;
521 }
522 finally
523 {
524 _headLock.unlock();
525 }
526 }
527 finally
528 {
529 _tailLock.unlock();
530 }
531 }
532
533 /* ------------------------------------------------------------ */
534 @Override
535 public void add(int index, E e)
536 {
537 if (e == null)
538 throw new NullPointerException();
539
540 _tailLock.lock();
541 try
542 {
543 _headLock.lock();
544 try
545 {
546
547 if (index<0 || index>_size.get())
548 throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
549
550 if (index==_size.get())
551 {
552 add(e);
553 }
554 else
555 {
556 if (_tail==_head)
557 if (!grow())
558 throw new IllegalStateException("full");
559
560 int i = _head+index;
561 if (i>=_capacity)
562 i-=_capacity;
563
564 _size.incrementAndGet();
565 _tail=(_tail+1)%_capacity;
566
567
568 if (i<_tail)
569 {
570 System.arraycopy(_elements,i,_elements,i+1,_tail-i);
571 _elements[i]=e;
572 }
573 else
574 {
575 if (_tail>0)
576 {
577 System.arraycopy(_elements,0,_elements,1,_tail);
578 _elements[0]=_elements[_capacity-1];
579 }
580
581 System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
582 _elements[i]=e;
583 }
584 }
585 }
586 finally
587 {
588 _headLock.unlock();
589 }
590 }
591 finally
592 {
593 _tailLock.unlock();
594 }
595 }
596
597 /* ------------------------------------------------------------ */
598 private boolean grow()
599 {
600 if (_growCapacity<=0)
601 return false;
602
603 _tailLock.lock();
604 try
605 {
606 _headLock.lock();
607 try
608 {
609 final int head=_head;
610 final int tail=_tail;
611 final int new_tail;
612
613 Object[] elements=new Object[_capacity+_growCapacity];
614
615 if (head<tail)
616 {
617 new_tail=tail-head;
618 System.arraycopy(_elements,head,elements,0,new_tail);
619 }
620 else if (head>tail || _size.get()>0)
621 {
622 new_tail=_capacity+tail-head;
623 int cut=_capacity-head;
624 System.arraycopy(_elements,head,elements,0,cut);
625 System.arraycopy(_elements,0,elements,cut,tail);
626 }
627 else
628 {
629 new_tail=0;
630 }
631
632 _elements=elements;
633 _capacity=_elements.length;
634 _head=0;
635 _tail=new_tail;
636 return true;
637 }
638 finally
639 {
640 _headLock.unlock();
641 }
642 }
643 finally
644 {
645 _tailLock.unlock();
646 }
647
648 }
649
650 /* ------------------------------------------------------------ */
651 public int drainTo(Collection<? super E> c)
652 {
653 throw new UnsupportedOperationException();
654 }
655
656 /* ------------------------------------------------------------ */
657 public int drainTo(Collection<? super E> c, int maxElements)
658 {
659 throw new UnsupportedOperationException();
660 }
661
662 /* ------------------------------------------------------------ */
663 public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
664 {
665 throw new UnsupportedOperationException();
666 }
667
668 /* ------------------------------------------------------------ */
669 public void put(E o) throws InterruptedException
670 {
671 if (!add(o))
672 throw new IllegalStateException("full");
673 }
674
675 /* ------------------------------------------------------------ */
676 public int remainingCapacity()
677 {
678 _tailLock.lock();
679 try
680 {
681 _headLock.lock();
682 try
683 {
684 return getCapacity()-size();
685 }
686 finally
687 {
688 _headLock.unlock();
689 }
690 }
691 finally
692 {
693 _tailLock.unlock();
694 }
695 }
696
697
698 /* ------------------------------------------------------------ */
699 long sumOfSpace()
700 {
701 // this method exists to stop clever optimisers removing the spacers
702 return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
703 }
704 }