comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 954:a021c4c9c244

use just one SelectSet per SelectorManager
author Franklin Schmidt <fschmidt@gmail.com>
date Thu, 13 Oct 2016 00:54:10 -0600
parents 7db4a488fc82
children d6b6d3e40161
comparison
equal deleted inserted replaced
953:7db4a488fc82 954:a021c4c9c244
58 { 58 {
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
60 60
61 private int _maxIdleTime; 61 private int _maxIdleTime;
62 private long _lowResourcesConnections; 62 private long _lowResourcesConnections;
63 private SelectSet[] _selectSet; 63 private SelectSet _selectSet;
64 private int _selectSets = 1;
65 private volatile int _set=0;
66 64
67 /* ------------------------------------------------------------ */ 65 /* ------------------------------------------------------------ */
68 /** 66 /**
69 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 67 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
70 * @see #setLowResourcesMaxIdleTime(long) 68 * @see #setLowResourcesMaxIdleTime(long)
73 { 71 {
74 _maxIdleTime = maxIdleTime; 72 _maxIdleTime = maxIdleTime;
75 } 73 }
76 74
77 /* ------------------------------------------------------------ */ 75 /* ------------------------------------------------------------ */
78 /**
79 * @param selectSets number of select sets to create
80 */
81 public void setSelectSets(int selectSets)
82 {
83 long lrc = _lowResourcesConnections * _selectSets;
84 _selectSets=selectSets;
85 _lowResourcesConnections=lrc/_selectSets;
86 }
87
88 /* ------------------------------------------------------------ */
89 /** Register a channel 76 /** Register a channel
90 * @param channel 77 * @param channel
91 */ 78 */
92 public void register(SocketChannel channel) 79 public void register(SocketChannel channel)
93 { 80 {
94 // The ++ increment here is not atomic, but it does not matter. 81 SelectSet set = _selectSet;
95 // so long as the value changes sometimes, then connections will 82 if (set!=null)
96 // be distributed over the available sets. 83 {
97
98 int s = _set++;
99 if (s<0)
100 s=-s;
101 s=s%_selectSets;
102 SelectSet[] sets = _selectSet;
103 if (sets!=null)
104 {
105 SelectSet set=sets[s];
106 set.addChange(channel); 84 set.addChange(channel);
107 } 85 }
108 } 86 }
109 87
110 /* ------------------------------------------------------------ */ 88 /* ------------------------------------------------------------ */
111 /** 89 /**
112 * @return the lowResourcesConnections 90 * @return the lowResourcesConnections
113 */ 91 */
114 public long getLowResourcesConnections() 92 public long getLowResourcesConnections()
115 { 93 {
116 return _lowResourcesConnections*_selectSets; 94 return _lowResourcesConnections;
117 } 95 }
118 96
119 /* ------------------------------------------------------------ */ 97 /* ------------------------------------------------------------ */
120 /** 98 /**
121 * Set the number of connections, which if exceeded places this manager in low resources state. 99 * Set the number of connections, which if exceeded places this manager in low resources state.
123 * @param lowResourcesConnections the number of connections 101 * @param lowResourcesConnections the number of connections
124 * @see #setLowResourcesMaxIdleTime(long) 102 * @see #setLowResourcesMaxIdleTime(long)
125 */ 103 */
126 public void setLowResourcesConnections(long lowResourcesConnections) 104 public void setLowResourcesConnections(long lowResourcesConnections)
127 { 105 {
128 _lowResourcesConnections=(lowResourcesConnections+_selectSets-1)/_selectSets; 106 _lowResourcesConnections = lowResourcesConnections;
129 } 107 }
130 108
131 109
132 public abstract void execute(Runnable task); 110 public abstract void execute(Runnable task);
133 111
136 * @see org.eclipse.component.AbstractLifeCycle#doStart() 114 * @see org.eclipse.component.AbstractLifeCycle#doStart()
137 */ 115 */
138 @Override 116 @Override
139 protected void doStart() throws Exception 117 protected void doStart() throws Exception
140 { 118 {
141 _selectSet = new SelectSet[_selectSets]; 119 _selectSet = new SelectSet();
142 for (int i=0;i<_selectSet.length;i++)
143 _selectSet[i]= new SelectSet(i);
144 120
145 super.doStart(); 121 super.doStart();
146 122
147 // start a thread to Select 123 // start a thread to Select
148 for (int i=0;i<_selectSets;i++) 124 execute(new Runnable()
149 { 125 {
150 final int id=i; 126 public void run()
151 execute(new Runnable() 127 {
152 { 128 String name=Thread.currentThread().getName();
153 public void run() 129 try
154 { 130 {
155 String name=Thread.currentThread().getName(); 131 SelectSet set = _selectSet;
156 try 132 if (set==null)
133 return;
134
135 Thread.currentThread().setName(name+" Selector");
136 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
137 while (isRunning())
157 { 138 {
158 SelectSet[] sets=_selectSet; 139 try
159 if (sets==null)
160 return;
161 SelectSet set=sets[id];
162
163 Thread.currentThread().setName(name+" Selector"+id);
164 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
165 while (isRunning())
166 { 140 {
167 try 141 set.doSelect();
168 { 142 }
169 set.doSelect(); 143 catch(IOException e)
170 } 144 {
171 catch(IOException e) 145 LOG.trace("",e);
172 { 146 }
173 LOG.trace("",e); 147 catch(Exception e)
174 } 148 {
175 catch(Exception e) 149 LOG.warn("",e);
176 {
177 LOG.warn("",e);
178 }
179 } 150 }
180 } 151 }
181 finally 152 }
182 { 153 finally
183 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); 154 {
184 Thread.currentThread().setName(name); 155 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
185 } 156 Thread.currentThread().setName(name);
186 } 157 }
187 158 }
188 }); 159
189 } 160 });
190 } 161 }
191 162
192 163
193 /* ------------------------------------------------------------------------------- */ 164 /* ------------------------------------------------------------------------------- */
194 @Override 165 @Override
195 protected void doStop() throws Exception 166 protected void doStop() throws Exception
196 { 167 {
197 SelectSet[] sets= _selectSet; 168 SelectSet set = _selectSet;
198 _selectSet=null; 169 _selectSet = null;
199 if (sets!=null) 170 if (set!=null)
200 { 171 {
201 for (SelectSet set : sets) 172 set.stop();
202 {
203 if (set!=null)
204 set.stop();
205 }
206 } 173 }
207 super.doStop(); 174 super.doStop();
208 } 175 }
209 176
210 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment); 177 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint, Object attachment);
215 } 182 }
216 183
217 public void dump(Appendable out, String indent) throws IOException 184 public void dump(Appendable out, String indent) throws IOException
218 { 185 {
219 AggregateLifeCycle.dumpObject(out,this); 186 AggregateLifeCycle.dumpObject(out,this);
220 AggregateLifeCycle.dump(out,indent,TypeUtil.asList(_selectSet)); 187 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet));
221 } 188 }
222 189
223 190
224 public class SelectSet implements Dumpable 191 public final class SelectSet implements Dumpable
225 { 192 {
226 private final int _setID;
227 private volatile long _now = System.currentTimeMillis(); 193 private volatile long _now = System.currentTimeMillis();
228 194
229 private volatile SaneSelector _selector; 195 private final SaneSelector _selector;
230 196
231 private volatile Thread _selecting;
232 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>(); 197 private ConcurrentMap<SelectChannelEndPoint,Object> _endPoints = new ConcurrentHashMap<SelectChannelEndPoint, Object>();
233 198
234 SelectSet(int acceptorID) throws Exception 199 SelectSet() throws IOException
235 { 200 {
236 _setID=acceptorID;
237
238 // create a selector;
239 _selector = new SaneSelector(); 201 _selector = new SaneSelector();
240 } 202 }
241 203
242 private void addChange(SocketChannel channel) 204 private void addChange(SocketChannel channel)
243 { 205 {
258 } catch(IOException e2) { 220 } catch(IOException e2) {
259 LOG.warn("",e2); 221 LOG.warn("",e2);
260 } 222 }
261 } 223 }
262 } 224 }
263 /* ------------------------------------------------------------ */ 225
264 /**
265 * Select and dispatch tasks found from changes and the selector.
266 *
267 * @throws IOException
268 */
269 private void doSelect() throws IOException 226 private void doSelect() throws IOException
270 { 227 {
271 try 228 try
272 { 229 {
273 _selecting=Thread.currentThread(); 230 _selector.select();
274 final SaneSelector selector = _selector;
275 // Stopped concurrently ?
276 if (selector == null)
277 return;
278
279 selector.select();
280 231
281 // Look for things to do 232 // Look for things to do
282 for (SelectionKey key: selector.selectedKeys()) 233 for (SelectionKey key: _selector.selectedKeys())
283 { 234 {
284 try 235 try
285 { 236 {
286 if (!key.isValid()) 237 if (!key.isValid())
287 { 238 {
294 245
295 if (key.isReadable()||key.isWritable()) { 246 if (key.isReadable()||key.isWritable()) {
296 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment(); 247 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
297 endpoint.schedule(); 248 endpoint.schedule();
298 } 249 }
299 key = null;
300 } 250 }
301 catch (CancelledKeyException e) 251 catch (CancelledKeyException e)
302 { 252 {
303 LOG.trace("",e); 253 LOG.trace("",e);
304 } 254 }
310 LOG.trace("",e); 260 LOG.trace("",e);
311 } 261 }
312 } 262 }
313 263
314 // Everything always handled 264 // Everything always handled
315 selector.selectedKeys().clear(); 265 _selector.selectedKeys().clear();
316 266
317 _now = System.currentTimeMillis(); 267 _now = System.currentTimeMillis();
318 } 268 }
319 catch (ClosedSelectorException e) 269 catch (ClosedSelectorException e)
320 { 270 {
324 LOG.trace("",e); 274 LOG.trace("",e);
325 } 275 }
326 catch (CancelledKeyException e) 276 catch (CancelledKeyException e)
327 { 277 {
328 LOG.trace("",e); 278 LOG.trace("",e);
329 }
330 finally
331 {
332 _selecting=null;
333 } 279 }
334 } 280 }
335 281
336 public SelectorManager getManager() 282 public SelectorManager getManager()
337 { 283 {
367 synchronized void stop() throws Exception 313 synchronized void stop() throws Exception
368 { 314 {
369 // close endpoints and selector 315 // close endpoints and selector
370 for (SelectionKey key : _selector.keys()) 316 for (SelectionKey key : _selector.keys())
371 { 317 {
372 Object att=key.attachment(); 318 EndPoint endpoint = (EndPoint)key.attachment();
373 if (att instanceof EndPoint) 319 try
374 { 320 {
375 EndPoint endpoint = (EndPoint)att; 321 endpoint.close();
376 try 322 }
377 { 323 catch(IOException e)
378 endpoint.close(); 324 {
379 } 325 LOG.trace("",e);
380 catch(IOException e)
381 {
382 LOG.trace("",e);
383 }
384 } 326 }
385 } 327 }
386 328
387 try 329 try
388 { 330 {
390 } 332 }
391 catch (IOException e) 333 catch (IOException e)
392 { 334 {
393 LOG.trace("",e); 335 LOG.trace("",e);
394 } 336 }
395 _selector = null;
396 } 337 }
397 338
398 public String dump() 339 public String dump()
399 { 340 {
400 return AggregateLifeCycle.dump(this); 341 return AggregateLifeCycle.dump(this);
401 } 342 }
402 343
403 public void dump(Appendable out, String indent) throws IOException 344 public void dump(Appendable out, String indent) throws IOException
404 { 345 {
405 out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_setID)).append("\n"); 346 out.append(String.valueOf(this)).append("\n");
406 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); 347 AggregateLifeCycle.dump(out,indent,Collections.emptyList());
407 } 348 }
408 349
409 public String toString() 350 public String toString()
410 { 351 {