Mercurial Hosting > luan
comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 954:a021c4c9c244
use just one SelectSet per SelectorManager
author | Franklin Schmidt <fschmidt@gmail.com> |
---|---|
date | Thu, 13 Oct 2016 00:54:10 -0600 |
parents | 7db4a488fc82 |
children | d6b6d3e40161 |
comparison
equal
deleted
inserted
replaced
953:7db4a488fc82 | 954:a021c4c9c244 |
---|---|
58 { | 58 { |
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); | 59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); |
60 | 60 |
61 private int _maxIdleTime; | 61 private int _maxIdleTime; |
62 private long _lowResourcesConnections; | 62 private long _lowResourcesConnections; |
63 private SelectSet[] _selectSet; | 63 private SelectSet _selectSet; |
64 private int _selectSets = 1; | |
65 private volatile int _set=0; | |
66 | 64 |
67 /* ------------------------------------------------------------ */ | 65 /* ------------------------------------------------------------ */ |
68 /** | 66 /** |
69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. | 67 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. |
70 * @see #setLowResourcesMaxIdleTime(long) | 68 * @see #setLowResourcesMaxIdleTime(long) |
73 { | 71 { |
74 _maxIdleTime = maxIdleTime; | 72 _maxIdleTime = maxIdleTime; |
75 } | 73 } |
76 | 74 |
77 /* ------------------------------------------------------------ */ | 75 /* ------------------------------------------------------------ */ |
78 /** | |
79 * @param selectSets number of select sets to create | |
80 */ | |
81 public void setSelectSets(int selectSets) | |
82 { | |
83 long lrc = _lowResourcesConnections * _selectSets; | |
84 _selectSets=selectSets; | |
85 _lowResourcesConnections=lrc/_selectSets; | |
86 } | |
87 | |
88 /* ------------------------------------------------------------ */ | |
89 /** Register a channel | 76 /** Register a channel |
90 * @param channel | 77 * @param channel |
91 */ | 78 */ |
92 public void register(SocketChannel channel) | 79 public void register(SocketChannel channel) |
93 { | 80 { |
94 // The ++ increment here is not atomic, but it does not matter. | 81 SelectSet set = _selectSet; |
95 // so long as the value changes sometimes, then connections will | 82 if (set!=null) |
96 // be distributed over the available sets. | 83 { |
97 | |
98 int s = _set++; | |
99 if (s<0) | |
100 s=-s; | |
101 s=s%_selectSets; | |
102 SelectSet[] sets = _selectSet; | |
103 if (sets!=null) | |
104 { | |
105 SelectSet set=sets[s]; | |
106 set.addChange(channel); | 84 set.addChange(channel); |
107 } | 85 } |
108 } | 86 } |
109 | 87 |
110 /* ------------------------------------------------------------ */ | 88 /* ------------------------------------------------------------ */ |
111 /** | 89 /** |
112 * @return the lowResourcesConnections | 90 * @return the lowResourcesConnections |
113 */ | 91 */ |
114 public long getLowResourcesConnections() | 92 public long getLowResourcesConnections() |
115 { | 93 { |
116 return _lowResourcesConnections*_selectSets; | 94 return _lowResourcesConnections; |
117 } | 95 } |
118 | 96 |
119 /* ------------------------------------------------------------ */ | 97 /* ------------------------------------------------------------ */ |
120 /** | 98 /** |
121 * Set the number of connections, which if exceeded places this manager in low resources state. | 99 * Set the number of connections, which if exceeded places this manager in low resources state. |
123 * @param lowResourcesConnections the number of connections | 101 * @param lowResourcesConnections the number of connections |
124 * @see #setLowResourcesMaxIdleTime(long) | 102 * @see #setLowResourcesMaxIdleTime(long) |
125 */ | 103 */ |
126 public void setLowResourcesConnections(long lowResourcesConnections) | 104 public void setLowResourcesConnections(long lowResourcesConnections) |
127 { | 105 { |
128 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; | 106 _lowResourcesConnections = lowResourcesConnections; |
129 } | 107 } |
130 | 108 |
131 | 109 |
132 public abstract void execute(Runnable task); | 110 public abstract void execute(Runnable task); |
133 | 111 |
136 * @see org.eclipse.component.AbstractLifeCycle#doStart() | 114 * @see org.eclipse.component.AbstractLifeCycle#doStart() |
137 */ | 115 */ |
138 @Override | 116 @Override |
139 protected void doStart() throws Exception | 117 protected void doStart() throws Exception |
140 { | 118 { |
141 _selectSet = new SelectSet[_selectSets]; | 119 _selectSet = new SelectSet(); |
142 for (int i=0;i<_selectSet.length;i++) | |
143 _selectSet[i]= new SelectSet(i); | |
144 | 120 |
145 super.doStart(); | 121 super.doStart(); |
146 | 122 |
147 // start a thread to Select | 123 // start a thread to Select |
148 for (int i=0;i<_selectSets;i++) | 124 execute(new Runnable() |
149 { | 125 { |
150 final int id=i; | 126 public void run() |
151 execute(new Runnable() | 127 { |
152 { | 128 String name=Thread.currentThread().getName(); |
153 public void run() | 129 try |
154 { | 130 { |
155 String name=Thread.currentThread().getName(); | 131 SelectSet set = _selectSet; |
156 try | 132 if (set==null) |
133 return; | |
134 | |
135 Thread.currentThread().setName(name+" Selector"); | |
136 LOG.debug("Starting {} on {}",Thread.currentThread(),this); | |
137 while (isRunning()) | |
157 { | 138 { |
158 SelectSet[] sets=_selectSet; | 139 try |
159 if (sets==null) | |
160 return; | |
161 SelectSet set=sets[id]; | |
162 | |
163 Thread.currentThread().setName(name+" Selector"+id); | |
164 LOG.debug("Starting {} on {}",Thread.currentThread(),this); | |
165 while (isRunning()) | |
166 { | 140 { |
167 try | 141 set.doSelect(); |
168 { | 142 } |
169 set.doSelect(); | 143 catch(IOException e) |
170 } | 144 { |
171 catch(IOException e) | 145 LOG.trace("",e); |
172 { | 146 } |
173 LOG.trace("",e); | 147 catch(Exception e) |
174 } | 148 { |
175 catch(Exception e) | 149 LOG.warn("",e); |
176 { | |
177 LOG.warn("",e); | |
178 } | |
179 } | 150 } |
180 } | 151 } |
181 finally | 152 } |
182 { | 153 finally |
183 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); | 154 { |
184 Thread.currentThread().setName(name); | 155 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); |
185 } | 156 Thread.currentThread().setName(name); |
186 } | 157 } |
187 | 158 } |
188 }); | 159 |
189 } | 160 }); |
190 } | 161 } |
191 | 162 |
192 | 163 |
193 /* ------------------------------------------------------------------------------- */ | 164 /* ------------------------------------------------------------------------------- */ |
194 @Override | 165 @Override |
195 protected void doStop() throws Exception | 166 protected void doStop() throws Exception |
196 { | 167 { |
197 SelectSet[] sets= _selectSet; | 168 SelectSet set = _selectSet; |
198 _selectSet=null; | 169 _selectSet = null; |
199 if (sets!=null) | 170 if (set!=null) |
200 { | 171 { |
201 for (SelectSet set : sets) | 172 set.stop(); |
202 { | |
203 if (set!=null) | |
204 set.stop(); | |
205 } | |
206 } | 173 } |
207 super.doStop(); | 174 super.doStop(); |
208 } | 175 } |
209 | 176 |
210 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); | 177 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); |
215 } | 182 } |
216 | 183 |
217 public void dump(Appendable out, String indent) throws IOException | 184 public void dump(Appendable out, String indent) throws IOException |
218 { | 185 { |
219 AggregateLifeCycle.dumpObject(out,this); | 186 AggregateLifeCycle.dumpObject(out,this); |
220 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); | 187 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet)); |
221 } | 188 } |
222 | 189 |
223 | 190 |
224 public class SelectSet implements Dumpable | 191 public final class SelectSet implements Dumpable |
225 { | 192 { |
226 private final int _setID; | |
227 private volatile long _now = System.currentTimeMillis(); | 193 private volatile long _now = System.currentTimeMillis(); |
228 | 194 |
229 private volatile SaneSelector _selector; | 195 private final SaneSelector _selector; |
230 | 196 |
231 private volatile Thread _selecting; | |
232 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); | 197 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); |
233 | 198 |
234 SelectSet(int acceptorID) throws Exception | 199 SelectSet() throws IOException |
235 { | 200 { |
236 _setID=acceptorID; | |
237 | |
238 // create a selector; | |
239 _selector = new SaneSelector(); | 201 _selector = new SaneSelector(); |
240 } | 202 } |
241 | 203 |
242 private void addChange(SocketChannel channel) | 204 private void addChange(SocketChannel channel) |
243 { | 205 { |
258 } catch(IOException e2) { | 220 } catch(IOException e2) { |
259 LOG.warn("",e2); | 221 LOG.warn("",e2); |
260 } | 222 } |
261 } | 223 } |
262 } | 224 } |
263 /* ------------------------------------------------------------ */ | 225 |
264 /** | |
265 * Select and dispatch tasks found from changes and the selector. | |
266 * | |
267 * @throws IOException | |
268 */ | |
269 private void doSelect() throws IOException | 226 private void doSelect() throws IOException |
270 { | 227 { |
271 try | 228 try |
272 { | 229 { |
273 _selecting=Thread.currentThread(); | 230 _selector.select(); |
274 final SaneSelector selector = _selector; | |
275 // Stopped concurrently ? | |
276 if (selector == null) | |
277 return; | |
278 | |
279 selector.select(); | |
280 | 231 |
281 // Look for things to do | 232 // Look for things to do |
282 for (SelectionKey key: selector.selectedKeys()) | 233 for (SelectionKey key: _selector.selectedKeys()) |
283 { | 234 { |
284 try | 235 try |
285 { | 236 { |
286 if (!key.isValid()) | 237 if (!key.isValid()) |
287 { | 238 { |
294 | 245 |
295 if (key.isReadable()||key.isWritable()) { | 246 if (key.isReadable()||key.isWritable()) { |
296 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); | 247 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); |
297 endpoint.schedule(); | 248 endpoint.schedule(); |
298 } | 249 } |
299 key = null; | |
300 } | 250 } |
301 catch (CancelledKeyException e) | 251 catch (CancelledKeyException e) |
302 { | 252 { |
303 LOG.trace("",e); | 253 LOG.trace("",e); |
304 } | 254 } |
310 LOG.trace("",e); | 260 LOG.trace("",e); |
311 } | 261 } |
312 } | 262 } |
313 | 263 |
314 // Everything always handled | 264 // Everything always handled |
315 selector.selectedKeys().clear(); | 265 _selector.selectedKeys().clear(); |
316 | 266 |
317 _now = System.currentTimeMillis(); | 267 _now = System.currentTimeMillis(); |
318 } | 268 } |
319 catch (ClosedSelectorException e) | 269 catch (ClosedSelectorException e) |
320 { | 270 { |
324 LOG.trace("",e); | 274 LOG.trace("",e); |
325 } | 275 } |
326 catch (CancelledKeyException e) | 276 catch (CancelledKeyException e) |
327 { | 277 { |
328 LOG.trace("",e); | 278 LOG.trace("",e); |
329 } | |
330 finally | |
331 { | |
332 _selecting=null; | |
333 } | 279 } |
334 } | 280 } |
335 | 281 |
336 public SelectorManager getManager() | 282 public SelectorManager getManager() |
337 { | 283 { |
367 synchronized void stop() throws Exception | 313 synchronized void stop() throws Exception |
368 { | 314 { |
369 // close endpoints and selector | 315 // close endpoints and selector |
370 for (SelectionKey key : _selector.keys()) | 316 for (SelectionKey key : _selector.keys()) |
371 { | 317 { |
372 Object att=key.attachment(); | 318 EndPoint endpoint = (EndPoint)key.attachment(); |
373 if (att instanceof EndPoint) | 319 try |
374 { | 320 { |
375 EndPoint endpoint = (EndPoint)att; | 321 endpoint.close(); |
376 try | 322 } |
377 { | 323 catch(IOException e) |
378 endpoint.close(); | 324 { |
379 } | 325 LOG.trace("",e); |
380 catch(IOException e) | |
381 { | |
382 LOG.trace("",e); | |
383 } | |
384 } | 326 } |
385 } | 327 } |
386 | 328 |
387 try | 329 try |
388 { | 330 { |
390 } | 332 } |
391 catch (IOException e) | 333 catch (IOException e) |
392 { | 334 { |
393 LOG.trace("",e); | 335 LOG.trace("",e); |
394 } | 336 } |
395 _selector = null; | |
396 } | 337 } |
397 | 338 |
398 public String dump() | 339 public String dump() |
399 { | 340 { |
400 return AggregateLifeCycle.dump(this); | 341 return AggregateLifeCycle.dump(this); |
401 } | 342 } |
402 | 343 |
403 public void dump(Appendable out, String indent) throws IOException | 344 public void dump(Appendable out, String indent) throws IOException |
404 { | 345 { |
405 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); | 346 out.append(String.valueOf(this)).append("\n"); |
406 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); | 347 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); |
407 } | 348 } |
408 | 349 |
409 public String toString() | 350 public String toString() |
410 { | 351 { |