Mercurial Hosting > luan
annotate src/org/eclipse/jetty/io/nio/SelectorManager.java @ 944:1d24b6e422fa
simplify SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Tue, 11 Oct 2016 20:16:03 -0600 |
parents | 96f60ce98949 |
children | f5aefdc4a81a |
rev | line source |
---|---|
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
1 // |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
2 // ======================================================================== |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
3 // Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd. |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
4 // ------------------------------------------------------------------------ |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
5 // All rights reserved. This program and the accompanying materials |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
6 // are made available under the terms of the Eclipse Public License v1.0 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
7 // and Apache License v2.0 which accompanies this distribution. |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
8 // |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
9 // The Eclipse Public License is available at |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
10 // http://www.eclipse.org/legal/epl-v10.html |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
11 // |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
12 // The Apache License v2.0 is available at |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
13 // http://www.opensource.org/licenses/apache2.0.php |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
14 // |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
15 // You may elect to redistribute this code under either of these licenses. |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
16 // ======================================================================== |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
17 // |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
18 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
19 package org.eclipse.jetty.io.nio; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
20 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
21 import java.io.IOException; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
22 import java.nio.channels.CancelledKeyException; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
23 import java.nio.channels.Channel; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
24 import java.nio.channels.ClosedSelectorException; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
25 import java.nio.channels.SelectableChannel; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
26 import java.nio.channels.SelectionKey; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
27 import java.nio.channels.Selector; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
28 import java.nio.channels.ServerSocketChannel; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
29 import java.nio.channels.SocketChannel; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
30 import java.util.ArrayList; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
31 import java.util.List; |
944
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
32 import java.util.Collections; |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
33 import java.util.Set; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
34 import java.util.concurrent.ConcurrentHashMap; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
35 import java.util.concurrent.ConcurrentLinkedQueue; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
36 import java.util.concurrent.ConcurrentMap; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
37 import java.util.concurrent.CountDownLatch; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
38 import java.util.concurrent.TimeUnit; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
39 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
40 import org.eclipse.jetty.io.AsyncEndPoint; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
41 import org.eclipse.jetty.io.ConnectedEndPoint; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
42 import org.eclipse.jetty.io.Connection; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
43 import org.eclipse.jetty.io.EndPoint; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
44 import org.eclipse.jetty.util.TypeUtil; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
45 import org.eclipse.jetty.util.component.AbstractLifeCycle; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
46 import org.eclipse.jetty.util.component.AggregateLifeCycle; |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
47 import org.eclipse.jetty.util.component.Dumpable; |
820
8e9db0bbf4f9
remove org.eclipse.jetty.util.log and upgrade slf4j
Franklin Schmidt <fschmidt@gmail.com>
parents:
802
diff
changeset
|
48 import org.slf4j.Logger; |
8e9db0bbf4f9
remove org.eclipse.jetty.util.log and upgrade slf4j
Franklin Schmidt <fschmidt@gmail.com>
parents:
802
diff
changeset
|
49 import org.slf4j.LoggerFactory; |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
50 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
51 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
52 /* ------------------------------------------------------------ */ |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
53 /** |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
54 * The Selector Manager manages and number of SelectSets to allow |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
55 * NIO scheduling to scale to large numbers of connections. |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
56 * <p> |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
57 */ |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
58 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
59 { |
865 | 60 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
61 |
865 | 62 private static final int __MONITOR_PERIOD=Integer.getInteger("org.eclipse.jetty.io.nio.MONITOR_PERIOD",1000).intValue(); |
63 private static final int __MAX_SELECTS=Integer.getInteger("org.eclipse.jetty.io.nio.MAX_SELECTS",100000).intValue(); | |
64 private static final int __BUSY_PAUSE=Integer.getInteger("org.eclipse.jetty.io.nio.BUSY_PAUSE",50).intValue(); | |
65 private static final int __IDLE_TICK=Integer.getInteger("org.eclipse.jetty.io.nio.IDLE_TICK",400).intValue(); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
66 |
865 | 67 private int _maxIdleTime; |
68 private long _lowResourcesConnections; | |
69 private SelectSet[] _selectSet; | |
70 private int _selectSets=1; | |
71 private volatile int _set=0; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
72 |
865 | 73 /* ------------------------------------------------------------ */ |
74 /** | |
75 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. | |
76 * @see #setLowResourcesMaxIdleTime(long) | |
77 */ | |
78 public void setMaxIdleTime(long maxIdleTime) | |
79 { | |
80 _maxIdleTime=(int)maxIdleTime; | |
81 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
82 |
865 | 83 /* ------------------------------------------------------------ */ |
84 /** | |
85 * @param selectSets number of select sets to create | |
86 */ | |
87 public void setSelectSets(int selectSets) | |
88 { | |
89 long lrc = _lowResourcesConnections * _selectSets; | |
90 _selectSets=selectSets; | |
91 _lowResourcesConnections=lrc/_selectSets; | |
92 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
93 |
865 | 94 /* ------------------------------------------------------------ */ |
95 /** | |
96 * @return the max idle time | |
97 */ | |
98 public long getMaxIdleTime() | |
99 { | |
100 return _maxIdleTime; | |
101 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
102 |
865 | 103 /* ------------------------------------------------------------ */ |
104 /** | |
105 * @return the number of select sets in use | |
106 */ | |
107 public int getSelectSets() | |
108 { | |
109 return _selectSets; | |
110 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
111 |
865 | 112 /* ------------------------------------------------------------ */ |
113 /** | |
114 * @param i | |
115 * @return The select set | |
116 */ | |
117 public SelectSet getSelectSet(int i) | |
118 { | |
119 return _selectSet[i]; | |
120 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
121 |
865 | 122 /* ------------------------------------------------------------ */ |
123 /** Register a channel | |
124 * @param channel | |
125 */ | |
126 public void register(SocketChannel channel) | |
127 { | |
128 // The ++ increment here is not atomic, but it does not matter. | |
129 // so long as the value changes sometimes, then connections will | |
130 // be distributed over the available sets. | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
131 |
865 | 132 int s=_set++; |
133 if (s<0) | |
134 s=-s; | |
135 s=s%_selectSets; | |
136 SelectSet[] sets=_selectSet; | |
137 if (sets!=null) | |
138 { | |
139 SelectSet set=sets[s]; | |
140 set.addChange(channel); | |
141 set.wakeup(); | |
142 } | |
143 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
144 |
865 | 145 /* ------------------------------------------------------------ */ |
146 /** Register a {@link ServerSocketChannel} | |
147 * @param acceptChannel | |
148 */ | |
149 public void register(ServerSocketChannel acceptChannel) | |
150 { | |
151 int s=_set++; | |
152 if (s<0) | |
153 s=-s; | |
154 s=s%_selectSets; | |
155 SelectSet set=_selectSet[s]; | |
156 set.addChange(acceptChannel); | |
157 set.wakeup(); | |
158 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
159 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
160 |
865 | 161 /* ------------------------------------------------------------ */ |
162 /** | |
163 * @return the lowResourcesConnections | |
164 */ | |
165 public long getLowResourcesConnections() | |
166 { | |
167 return _lowResourcesConnections*_selectSets; | |
168 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
169 |
865 | 170 /* ------------------------------------------------------------ */ |
171 /** | |
172 * Set the number of connections, which if exceeded places this manager in low resources state. | |
173 * This is not an exact measure as the connection count is averaged over the select sets. | |
174 * @param lowResourcesConnections the number of connections | |
175 * @see #setLowResourcesMaxIdleTime(long) | |
176 */ | |
177 public void setLowResourcesConnections(long lowResourcesConnections) | |
178 { | |
179 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; | |
180 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
181 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
182 |
865 | 183 public abstract void execute(Runnable task); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
184 |
865 | 185 /* ------------------------------------------------------------ */ |
186 /* (non-Javadoc) | |
187 * @see org.eclipse.component.AbstractLifeCycle#doStart() | |
188 */ | |
189 @Override | |
190 protected void doStart() throws Exception | |
191 { | |
192 _selectSet = new SelectSet[_selectSets]; | |
193 for (int i=0;i<_selectSet.length;i++) | |
194 _selectSet[i]= new SelectSet(i); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
195 |
865 | 196 super.doStart(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
197 |
865 | 198 // start a thread to Select |
199 for (int i=0;i<getSelectSets();i++) | |
200 { | |
201 final int id=i; | |
202 execute(new Runnable() | |
203 { | |
204 public void run() | |
205 { | |
206 String name=Thread.currentThread().getName(); | |
207 try | |
208 { | |
209 SelectSet[] sets=_selectSet; | |
210 if (sets==null) | |
211 return; | |
212 SelectSet set=sets[id]; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
213 |
865 | 214 Thread.currentThread().setName(name+" Selector"+id); |
215 LOG.debug("Starting {} on {}",Thread.currentThread(),this); | |
216 while (isRunning()) | |
217 { | |
218 try | |
219 { | |
220 set.doSelect(); | |
221 } | |
222 catch(IOException e) | |
223 { | |
224 LOG.trace("",e); | |
225 } | |
226 catch(Exception e) | |
227 { | |
228 LOG.warn("",e); | |
229 } | |
230 } | |
231 } | |
232 finally | |
233 { | |
234 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); | |
235 Thread.currentThread().setName(name); | |
236 } | |
237 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
238 |
865 | 239 }); |
240 } | |
241 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
242 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
243 |
865 | 244 /* ------------------------------------------------------------------------------- */ |
245 @Override | |
246 protected void doStop() throws Exception | |
247 { | |
248 SelectSet[] sets= _selectSet; | |
249 _selectSet=null; | |
250 if (sets!=null) | |
251 { | |
252 for (SelectSet set : sets) | |
253 { | |
254 if (set!=null) | |
255 set.stop(); | |
256 } | |
257 } | |
258 super.doStop(); | |
259 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
260 |
865 | 261 /* ------------------------------------------------------------ */ |
262 /** | |
263 * @param endpoint | |
264 */ | |
265 protected abstract void endPointClosed(SelectChannelEndPoint endpoint); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
266 |
865 | 267 /* ------------------------------------------------------------------------------- */ |
268 public abstract AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
269 |
865 | 270 /* ------------------------------------------------------------ */ |
271 /** | |
272 * Create a new end point | |
273 * @param channel | |
274 * @param selectSet | |
275 * @param sKey the selection key | |
276 * @return the new endpoint {@link SelectChannelEndPoint} | |
277 * @throws IOException | |
278 */ | |
279 protected abstract SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectorManager.SelectSet selectSet, SelectionKey sKey) throws IOException; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
280 |
865 | 281 /* ------------------------------------------------------------------------------- */ |
282 protected void connectionFailed(SocketChannel channel,Throwable ex,Object attachment) | |
283 { | |
284 LOG.warn(ex+","+channel+","+attachment); | |
285 LOG.debug("",ex); | |
286 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
287 |
865 | 288 /* ------------------------------------------------------------ */ |
289 public String dump() | |
290 { | |
291 return AggregateLifeCycle.dump(this); | |
292 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
293 |
865 | 294 /* ------------------------------------------------------------ */ |
295 public void dump(Appendable out, String indent) throws IOException | |
296 { | |
297 AggregateLifeCycle.dumpObject(out,this); | |
298 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); | |
299 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
300 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
301 |
865 | 302 /* ------------------------------------------------------------------------------- */ |
303 /* ------------------------------------------------------------------------------- */ | |
304 /* ------------------------------------------------------------------------------- */ | |
305 public class SelectSet implements Dumpable | |
306 { | |
307 private final int _setID; | |
943 | 308 private volatile long _now = System.currentTimeMillis(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
309 |
865 | 310 private final ConcurrentLinkedQueue<Object> _changes = new ConcurrentLinkedQueue<Object>(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
311 |
865 | 312 private volatile Selector _selector; |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
313 |
865 | 314 private volatile Thread _selecting; |
315 private int _busySelects; | |
316 private long _monitorNext; | |
317 private boolean _pausing; | |
318 private boolean _paused; | |
319 private volatile long _idleTick; | |
320 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
321 |
865 | 322 SelectSet(int acceptorID) throws Exception |
323 { | |
324 _setID=acceptorID; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
325 |
865 | 326 _idleTick = System.currentTimeMillis(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
327 |
865 | 328 // create a selector; |
329 _selector = Selector.open(); | |
330 _monitorNext=System.currentTimeMillis()+__MONITOR_PERIOD; | |
331 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
332 |
865 | 333 public void addChange(Object change) |
334 { | |
335 _changes.add(change); | |
336 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
337 |
865 | 338 /* ------------------------------------------------------------ */ |
339 /** | |
340 * Select and dispatch tasks found from changes and the selector. | |
341 * | |
342 * @throws IOException | |
343 */ | |
344 public void doSelect() throws IOException | |
345 { | |
346 try | |
347 { | |
348 _selecting=Thread.currentThread(); | |
349 final Selector selector=_selector; | |
350 // Stopped concurrently ? | |
351 if (selector == null) | |
352 return; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
353 |
865 | 354 // Make any key changes required |
355 Object change; | |
356 int changes=_changes.size(); | |
357 while (changes-->0 && (change=_changes.poll())!=null) | |
358 { | |
359 Channel ch=null; | |
360 SelectionKey key=null; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
361 |
865 | 362 try |
363 { | |
364 if (change instanceof EndPoint) | |
365 { | |
366 // Update the operations for a key. | |
367 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)change; | |
368 ch=endpoint.getChannel(); | |
369 endpoint.doUpdateKey(); | |
370 } | |
371 else if (change instanceof SocketChannel) | |
372 { | |
373 // Newly registered channel | |
374 final SocketChannel channel=(SocketChannel)change; | |
375 ch=channel; | |
376 key = channel.register(selector,SelectionKey.OP_READ,null); | |
377 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | |
378 key.attach(endpoint); | |
379 endpoint.schedule(); | |
380 } | |
381 else if (change instanceof Runnable) | |
382 { | |
383 execute((Runnable)change); | |
384 } | |
385 else | |
386 throw new IllegalArgumentException(change.toString()); | |
387 } | |
388 catch (CancelledKeyException e) | |
389 { | |
390 LOG.trace("",e); | |
391 } | |
392 catch (Throwable e) | |
393 { | |
394 if (isRunning()) | |
395 LOG.warn("",e); | |
396 else | |
397 LOG.debug("",e); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
398 |
865 | 399 try |
400 { | |
401 if (ch!=null) | |
402 ch.close(); | |
403 } | |
404 catch(IOException e2) | |
405 { | |
406 LOG.debug("",e2); | |
407 } | |
408 } | |
409 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
410 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
411 |
865 | 412 // Do and instant select to see if any connections can be handled. |
413 int selected=selector.selectNow(); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
414 |
943 | 415 _now = System.currentTimeMillis(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
416 |
865 | 417 // if no immediate things to do |
418 if (selected==0 && selector.selectedKeys().isEmpty()) | |
419 { | |
420 // If we are in pausing mode | |
421 if (_pausing) | |
422 { | |
423 try | |
424 { | |
425 Thread.sleep(__BUSY_PAUSE); // pause to reduce impact of busy loop | |
426 } | |
427 catch(InterruptedException e) | |
428 { | |
429 LOG.trace("",e); | |
430 } | |
943 | 431 _now = System.currentTimeMillis(); |
865 | 432 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
433 |
865 | 434 // workout how long to wait in select |
435 long wait = _changes.size()==0?__IDLE_TICK:0L; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
436 |
865 | 437 // If we should wait with a select |
438 if (wait>0) | |
439 { | |
943 | 440 long before = _now; |
865 | 441 selector.select(wait); |
943 | 442 _now = System.currentTimeMillis(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
443 |
865 | 444 // If we are monitoring for busy selector |
445 // and this select did not wait more than 1ms | |
943 | 446 if (__MONITOR_PERIOD>0 && _now-before <=1) |
865 | 447 { |
448 // count this as a busy select and if there have been too many this monitor cycle | |
449 if (++_busySelects>__MAX_SELECTS) | |
450 { | |
451 // Start injecting pauses | |
452 _pausing=true; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
453 |
865 | 454 // if this is the first pause |
455 if (!_paused) | |
456 { | |
457 // Log and dump some status | |
458 _paused=true; | |
459 LOG.warn("Selector {} is too busy, pausing!",this); | |
460 } | |
461 } | |
462 } | |
463 } | |
464 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
465 |
865 | 466 // have we been destroyed while sleeping |
467 if (_selector==null || !selector.isOpen()) | |
468 return; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
469 |
865 | 470 // Look for things to do |
471 for (SelectionKey key: selector.selectedKeys()) | |
472 { | |
473 SocketChannel channel=null; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
474 |
865 | 475 try |
476 { | |
477 if (!key.isValid()) | |
478 { | |
479 key.cancel(); | |
480 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); | |
481 if (endpoint != null) | |
482 endpoint.doUpdateKey(); | |
483 continue; | |
484 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
485 |
865 | 486 Object att = key.attachment(); |
487 if (att instanceof SelectChannelEndPoint) | |
488 { | |
489 if (key.isReadable()||key.isWritable()) | |
490 ((SelectChannelEndPoint)att).schedule(); | |
491 } | |
492 else if (key.isConnectable()) | |
493 { | |
494 // Complete a connection of a registered channel | |
495 channel = (SocketChannel)key.channel(); | |
496 boolean connected=false; | |
497 try | |
498 { | |
499 connected=channel.finishConnect(); | |
500 } | |
501 catch(Exception e) | |
502 { | |
503 connectionFailed(channel,e,att); | |
504 } | |
505 finally | |
506 { | |
507 if (connected) | |
508 { | |
509 key.interestOps(SelectionKey.OP_READ); | |
510 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | |
511 key.attach(endpoint); | |
512 endpoint.schedule(); | |
513 } | |
514 else | |
515 { | |
516 key.cancel(); | |
517 channel.close(); | |
518 } | |
519 } | |
520 } | |
521 else | |
522 { | |
523 // Wrap readable registered channel in an endpoint | |
524 channel = (SocketChannel)key.channel(); | |
525 SelectChannelEndPoint endpoint = createEndPoint(channel,key); | |
526 key.attach(endpoint); | |
527 if (key.isReadable()) | |
528 endpoint.schedule(); | |
529 } | |
530 key = null; | |
531 } | |
532 catch (CancelledKeyException e) | |
533 { | |
534 LOG.trace("",e); | |
535 } | |
536 catch (Exception e) | |
537 { | |
538 if (isRunning()) | |
539 LOG.warn("",e); | |
540 else | |
541 LOG.trace("",e); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
542 |
865 | 543 try |
544 { | |
545 if (channel!=null) | |
546 channel.close(); | |
547 } | |
548 catch(IOException e2) | |
549 { | |
550 LOG.debug("",e2); | |
551 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
552 |
865 | 553 if (key != null && !(key.channel() instanceof ServerSocketChannel) && key.isValid()) |
554 key.cancel(); | |
555 } | |
556 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
557 |
865 | 558 // Everything always handled |
559 selector.selectedKeys().clear(); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
560 |
943 | 561 _now = System.currentTimeMillis(); |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
562 |
865 | 563 // Idle tick |
943 | 564 if (_now-_idleTick>__IDLE_TICK) |
865 | 565 { |
943 | 566 _idleTick = _now; |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
567 |
865 | 568 final long idle_now=((_lowResourcesConnections>0 && selector.keys().size()>_lowResourcesConnections)) |
943 | 569 ?(_now+_maxIdleTime) |
570 :_now; | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
571 |
865 | 572 execute(new Runnable() |
573 { | |
574 public void run() | |
575 { | |
576 for (SelectChannelEndPoint endp:_endPoints.keySet()) | |
577 { | |
578 endp.checkIdleTimestamp(idle_now); | |
579 } | |
580 } | |
581 public String toString() {return "Idle-"+super.toString();} | |
582 }); | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
583 |
865 | 584 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
585 |
865 | 586 // Reset busy select monitor counts |
943 | 587 if (__MONITOR_PERIOD>0 && _now>_monitorNext) |
865 | 588 { |
589 _busySelects=0; | |
590 _pausing=false; | |
943 | 591 _monitorNext=_now+__MONITOR_PERIOD; |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
592 |
865 | 593 } |
594 } | |
595 catch (ClosedSelectorException e) | |
596 { | |
597 if (isRunning()) | |
598 LOG.warn("",e); | |
599 else | |
600 LOG.trace("",e); | |
601 } | |
602 catch (CancelledKeyException e) | |
603 { | |
604 LOG.trace("",e); | |
605 } | |
606 finally | |
607 { | |
608 _selecting=null; | |
609 } | |
610 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
611 |
865 | 612 public SelectorManager getManager() |
613 { | |
614 return SelectorManager.this; | |
615 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
616 |
865 | 617 public long getNow() |
618 { | |
943 | 619 return _now; |
865 | 620 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
621 |
865 | 622 public void wakeup() |
623 { | |
944
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
624 Selector selector = _selector; |
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
625 if (selector!=null) |
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
626 selector.wakeup(); |
865 | 627 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
628 |
865 | 629 private SelectChannelEndPoint createEndPoint(SocketChannel channel, SelectionKey sKey) throws IOException |
630 { | |
631 SelectChannelEndPoint endp = newEndPoint(channel,this,sKey); | |
632 LOG.debug("created {}",endp); | |
633 _endPoints.put(endp,this); | |
634 return endp; | |
635 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
636 |
865 | 637 public void destroyEndPoint(SelectChannelEndPoint endp) |
638 { | |
639 LOG.debug("destroyEndPoint {}",endp); | |
640 _endPoints.remove(endp); | |
641 endPointClosed(endp); | |
642 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
643 |
865 | 644 Selector getSelector() |
645 { | |
646 return _selector; | |
647 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
648 |
865 | 649 void stop() throws Exception |
650 { | |
651 // Spin for a while waiting for selector to complete | |
652 // to avoid unneccessary closed channel exceptions | |
653 try | |
654 { | |
655 for (int i=0;i<100 && _selecting!=null;i++) | |
656 { | |
657 wakeup(); | |
658 Thread.sleep(10); | |
659 } | |
660 } | |
661 catch(Exception e) | |
662 { | |
944
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
663 LOG.warn("",e); |
865 | 664 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
665 |
865 | 666 // close endpoints and selector |
667 synchronized (this) | |
668 { | |
669 Selector selector=_selector; | |
670 for (SelectionKey key:selector.keys()) | |
671 { | |
672 if (key==null) | |
673 continue; | |
674 Object att=key.attachment(); | |
675 if (att instanceof EndPoint) | |
676 { | |
677 EndPoint endpoint = (EndPoint)att; | |
678 try | |
679 { | |
680 endpoint.close(); | |
681 } | |
682 catch(IOException e) | |
683 { | |
684 LOG.trace("",e); | |
685 } | |
686 } | |
687 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
688 |
865 | 689 try |
690 { | |
691 selector=_selector; | |
692 if (selector != null) | |
693 selector.close(); | |
694 } | |
695 catch (IOException e) | |
696 { | |
697 LOG.trace("",e); | |
698 } | |
944
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
699 _selector = null; |
865 | 700 } |
701 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
702 |
865 | 703 public String dump() |
704 { | |
705 return AggregateLifeCycle.dump(this); | |
706 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
707 |
865 | 708 public void dump(Appendable out, String indent) throws IOException |
709 { | |
710 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); | |
944
1d24b6e422fa
simplify SelectorManager
Franklin Schmidt <fschmidt@gmail.com>
parents:
943
diff
changeset
|
711 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); |
865 | 712 } |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
713 |
865 | 714 public String toString() |
715 { | |
716 Selector selector=_selector; | |
717 return String.format("%s keys=%d selected=%d", | |
718 super.toString(), | |
719 selector != null && selector.isOpen() ? selector.keys().size() : -1, | |
720 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); | |
721 } | |
722 } | |
802
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
723 |
3428c60d7cfc
replace jetty jars with source
Franklin Schmidt <fschmidt@gmail.com>
parents:
diff
changeset
|
724 } |