comparison src/org/eclipse/jetty/io/nio/SelectorManager.java @ 964:768414c16e10

remove SelectSet
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 14 Oct 2016 01:03:47 -0600
parents 94498d6daf5b
children 0d20943cfea2
comparison
equal deleted inserted replaced
963:4b6216fa9cec 964:768414c16e10
54 * NIO scheduling to scale to large numbers of connections. 54 * NIO scheduling to scale to large numbers of connections.
55 * <p> 55 * <p>
56 */ 56 */
57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable 57 public abstract class SelectorManager extends AbstractLifeCycle implements Dumpable
58 { 58 {
59 public static final Logger LOG=LoggerFactory.getLogger("org.eclipse.jetty.io.nio"); 59 public static final Logger LOG = LoggerFactory.getLogger("org.eclipse.jetty.io.nio");
60 60
61 private int _maxIdleTime; 61 private final int _maxIdleTime;
62 private long _lowResourcesConnections; 62 private volatile long _now = System.currentTimeMillis();
63 private SelectSet _selectSet; 63 private SaneSelector _selector;
64 64
65 /* ------------------------------------------------------------ */ 65 /* ------------------------------------------------------------ */
66 /** 66 /**
67 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed. 67 * @param maxIdleTime The maximum period in milli seconds that a connection may be idle before it is closed.
68 * @see #setLowResourcesMaxIdleTime(long) 68 * @see #setLowResourcesMaxIdleTime(long)
69 */ 69 */
70 public void setMaxIdleTime(int maxIdleTime) 70 public SelectorManager(int maxIdleTime)
71 { 71 {
72 _maxIdleTime = maxIdleTime; 72 _maxIdleTime = maxIdleTime;
73 } 73 }
74 74
75 /* ------------------------------------------------------------ */ 75 /* ------------------------------------------------------------ */
76 /** Register a channel 76 /** Register a channel
77 * @param channel 77 * @param channel
78 */ 78 */
79 public void register(SocketChannel channel) 79 public final void register(SocketChannel channel)
80 { 80 {
81 /*
81 SelectSet set = _selectSet; 82 SelectSet set = _selectSet;
82 if (set!=null) 83 if (set!=null)
83 { 84 {
84 set.addChange(channel); 85 set.addChange(channel);
85 } 86 }
86 } 87 */
87 88 try {
88 /* ------------------------------------------------------------ */ 89 SelectionKey key = _selector.register(channel,0,null);
89 /** 90 SelectChannelEndPoint endpoint = new SelectChannelEndPoint(channel,this,key, _maxIdleTime);
90 * @return the lowResourcesConnections 91 endpoint.setConnection(newConnection(channel,endpoint));
91 */ 92 key.attach(endpoint);
92 public long getLowResourcesConnections() 93 _selector.update();
93 { 94 //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
94 return _lowResourcesConnections; 95 endpoint.schedule();
95 } 96 } catch(IOException e) {
96 97 LOG.warn("",e);
97 /* ------------------------------------------------------------ */ 98 try {
98 /** 99 channel.close();
99 * Set the number of connections, which if exceeded places this manager in low resources state. 100 } catch(IOException e2) {
100 * This is not an exact measure as the connection count is averaged over the select sets. 101 LOG.warn("",e2);
101 * @param lowResourcesConnections the number of connections 102 }
102 * @see #setLowResourcesMaxIdleTime(long) 103 }
103 */
104 public void setLowResourcesConnections(long lowResourcesConnections)
105 {
106 _lowResourcesConnections = lowResourcesConnections;
107 } 104 }
108 105
109 106
110 public abstract void execute(Runnable task); 107 public abstract void execute(Runnable task);
111 108
112 /* ------------------------------------------------------------ */ 109
113 /* (non-Javadoc)
114 * @see org.eclipse.component.AbstractLifeCycle#doStart()
115 */
116 @Override 110 @Override
117 protected void doStart() throws Exception 111 protected void doStart() throws Exception
118 { 112 {
119 _selectSet = new SelectSet(); 113 _selector = new SaneSelector();
120 114
121 super.doStart(); 115 super.doStart();
122 116
123 // start a thread to Select 117 // start a thread to Select
124 execute(new Runnable() 118 execute(new Runnable()
126 public void run() 120 public void run()
127 { 121 {
128 String name=Thread.currentThread().getName(); 122 String name=Thread.currentThread().getName();
129 try 123 try
130 { 124 {
131 SelectSet set = _selectSet; 125 if (_selector==null)
132 if (set==null)
133 return; 126 return;
134 127
135 Thread.currentThread().setName(name+" Selector"); 128 Thread.currentThread().setName(name+" Selector");
136 LOG.debug("Starting {} on {}",Thread.currentThread(),this); 129 LOG.debug("Starting {} on {}",Thread.currentThread(),this);
137 while (isRunning()) 130 while (isRunning())
138 { 131 {
139 try 132 try
140 { 133 {
141 set.doSelect(); 134 doSelect();
142 } 135 }
143 catch(IOException e) 136 catch(IOException e)
144 { 137 {
145 LOG.trace("",e); 138 LOG.trace("",e);
146 } 139 }
150 } 143 }
151 } 144 }
152 } 145 }
153 finally 146 finally
154 { 147 {
148 _selector = null;
155 LOG.debug("Stopped {} on {}",Thread.currentThread(),this); 149 LOG.debug("Stopped {} on {}",Thread.currentThread(),this);
156 Thread.currentThread().setName(name); 150 Thread.currentThread().setName(name);
157 } 151 }
158 } 152 }
159 153
160 }); 154 });
161 } 155 }
162 156
163 157
164 @Override 158 @Override
165 protected void doStop() throws Exception 159 protected synchronized void doStop() throws Exception
166 { 160 {
167 SelectSet set = _selectSet; 161 if (_selector!=null)
168 _selectSet = null;
169 if (set!=null)
170 {
171 set.stop();
172 }
173 super.doStop();
174 }
175
176 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
177
178 @Override
179 public String dump()
180 {
181 return AggregateLifeCycle.dump(this);
182 }
183
184 @Override
185 public void dump(Appendable out, String indent) throws IOException
186 {
187 AggregateLifeCycle.dumpObject(out,this);
188 AggregateLifeCycle.dump(out,indent,Collections.singletonList(_selectSet));
189 }
190
191
192 public final class SelectSet implements Dumpable
193 {
194 private volatile long _now = System.currentTimeMillis();
195
196 private final SaneSelector _selector;
197
198 private SelectSet() throws IOException
199 {
200 _selector = new SaneSelector();
201 }
202
203 private void addChange(SocketChannel channel)
204 {
205 try {
206 //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq a");
207 // SelectionKey key = _selector.register(channel,SelectionKey.OP_READ,null);
208 SelectionKey key = _selector.register(channel,0,null);
209
210 SelectChannelEndPoint endpoint = new SelectChannelEndPoint(channel,this,key, _maxIdleTime);
211 endpoint.setConnection(newConnection(channel,endpoint));
212
213 key.attach(endpoint);
214 // key.interestOps(SelectionKey.OP_READ);
215 _selector.update();
216 //System.out.println("qqqqqqqqqqqqqqqqqqqqqqqqqqqqq b");
217 endpoint.schedule();
218 } catch(IOException e) {
219 LOG.warn("",e);
220 try {
221 channel.close();
222 } catch(IOException e2) {
223 LOG.warn("",e2);
224 }
225 }
226 }
227
228 private void doSelect() throws IOException
229 {
230 try
231 {
232 _selector.select();
233
234 // Look for things to do
235 for (SelectionKey key: _selector.selectedKeys())
236 {
237 try
238 {
239 if (!key.isValid())
240 {
241 key.cancel();
242 continue;
243 }
244
245 if (key.isReadable()||key.isWritable()) {
246 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
247 endpoint.schedule();
248 }
249 }
250 catch (CancelledKeyException e)
251 {
252 // LOG.trace("",e);
253 LOG.warn("",e);
254 }
255 catch (Exception e)
256 {
257 LOG.warn("",e);
258 }
259 }
260
261 // Everything always handled
262 _selector.selectedKeys().clear();
263
264 _now = System.currentTimeMillis();
265 }
266 catch (ClosedSelectorException e)
267 {
268 if (isRunning())
269 LOG.warn("",e);
270 else
271 LOG.trace("",e);
272 }
273 catch (CancelledKeyException e)
274 {
275 LOG.trace("",e);
276 }
277 }
278
279 public SelectorManager getManager()
280 {
281 return SelectorManager.this;
282 }
283
284 public long getNow()
285 {
286 return _now;
287 }
288
289 SaneSelector getSelector()
290 {
291 return _selector;
292 }
293
294 private synchronized void stop() throws Exception
295 { 162 {
296 // close endpoints and selector 163 // close endpoints and selector
297 for (SelectionKey key : _selector.keys()) 164 for (SelectionKey key : _selector.keys())
298 { 165 {
299 EndPoint endpoint = (EndPoint)key.attachment(); 166 EndPoint endpoint = (EndPoint)key.attachment();
304 catch(IOException e) 171 catch(IOException e)
305 { 172 {
306 LOG.trace("",e); 173 LOG.trace("",e);
307 } 174 }
308 } 175 }
309 176
310 try 177 try
311 { 178 {
312 _selector.close(); 179 _selector.close();
313 } 180 }
314 catch (IOException e) 181 catch (IOException e)
315 { 182 {
316 LOG.trace("",e); 183 LOG.trace("",e);
317 } 184 }
318 } 185 }
319 186 super.doStop();
320 @Override 187 }
321 public String dump() 188
322 { 189 public abstract AsyncConnection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);
323 return AggregateLifeCycle.dump(this); 190
324 } 191 @Override
325 192 public String dump()
326 @Override 193 {
327 public void dump(Appendable out, String indent) throws IOException 194 return AggregateLifeCycle.dump(this);
328 { 195 }
329 out.append(String.valueOf(this)).append("\n"); 196
330 AggregateLifeCycle.dump(out,indent,Collections.emptyList()); 197 @Override
331 } 198 public void dump(Appendable out, String indent) throws IOException
332 199 {
333 @Override 200 AggregateLifeCycle.dumpObject(out,this);
334 public String toString() 201 // AggregateLifeCycle.dump(out,indent,Collections.emptyList());
335 { 202 }
336 SaneSelector selector=_selector; 203
337 return String.format("%s keys=%d selected=%d", 204
338 super.toString(), 205 // from SelectSet
339 selector != null && selector.isOpen() ? selector.keys().size() : -1, 206
340 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1); 207 private void doSelect() throws IOException
341 } 208 {
209 try
210 {
211 _selector.select();
212
213 // Look for things to do
214 for (SelectionKey key: _selector.selectedKeys())
215 {
216 try
217 {
218 if (!key.isValid())
219 {
220 key.cancel();
221 continue;
222 }
223
224 if (key.isReadable()||key.isWritable()) {
225 SelectChannelEndPoint endpoint = (SelectChannelEndPoint)key.attachment();
226 endpoint.schedule();
227 }
228 }
229 catch (CancelledKeyException e)
230 {
231 // LOG.trace("",e);
232 LOG.warn("",e);
233 }
234 catch (Exception e)
235 {
236 LOG.warn("",e);
237 }
238 }
239
240 // Everything always handled
241 _selector.selectedKeys().clear();
242
243 _now = System.currentTimeMillis();
244 }
245 catch (ClosedSelectorException e)
246 {
247 if (isRunning())
248 LOG.warn("",e);
249 else
250 LOG.trace("",e);
251 }
252 catch (CancelledKeyException e)
253 {
254 LOG.trace("",e);
255 }
256 }
257
258 public final long getNow()
259 {
260 return _now;
261 }
262
263 final SaneSelector getSelector()
264 {
265 return _selector;
266 }
267
268 @Override
269 public final String toString()
270 {
271 SaneSelector selector=_selector;
272 return String.format("%s keys=%d selected=%d",
273 super.toString(),
274 selector != null && selector.isOpen() ? selector.keys().size() : -1,
275 selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
342 } 276 }
343 277
344 } 278 }