comparison src/org/eclipse/jetty/io/nio/DirectNIOBuffer.java @ 1010:2712133d5bce

simplify Buffer code
author Franklin Schmidt <fschmidt@gmail.com>
date Sun, 23 Oct 2016 22:23:50 -0600
parents 8e9db0bbf4f9
children 80cad9086593
comparison
equal deleted inserted replaced
1009:c3a04bded909 1010:2712133d5bce
33 import org.eclipse.jetty.io.Buffer; 33 import org.eclipse.jetty.io.Buffer;
34 import org.eclipse.jetty.util.IO; 34 import org.eclipse.jetty.util.IO;
35 import org.slf4j.Logger; 35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory; 36 import org.slf4j.LoggerFactory;
37 37
38 /* ------------------------------------------------------------------------------- */ 38
39 /** 39 public final class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
40 *
41 *
42 */
43 public class DirectNIOBuffer extends AbstractBuffer implements NIOBuffer
44 { 40 {
45 private static final Logger LOG = LoggerFactory.getLogger(DirectNIOBuffer.class); 41 private static final Logger LOG = LoggerFactory.getLogger(DirectNIOBuffer.class);
46 42
47 protected final ByteBuffer _buf; 43 protected final ByteBuffer _buf;
48 private ReadableByteChannel _in; 44 private ReadableByteChannel _in;
49 private InputStream _inStream; 45 private InputStream _inStream;
50 private WritableByteChannel _out; 46
51 private OutputStream _outStream; 47 public DirectNIOBuffer(int size)
52 48 {
53 public DirectNIOBuffer(int size) 49 super(READWRITE,NON_VOLATILE);
54 { 50 _buf = ByteBuffer.allocateDirect(size);
55 super(READWRITE,NON_VOLATILE); 51 _buf.position(0);
56 _buf = ByteBuffer.allocateDirect(size); 52 _buf.limit(_buf.capacity());
57 _buf.position(0); 53 }
58 _buf.limit(_buf.capacity()); 54
59 } 55
60 56 @Override
61 public DirectNIOBuffer(ByteBuffer buffer,boolean immutable) 57 public boolean isDirect()
62 { 58 {
63 super(immutable?IMMUTABLE:READWRITE,NON_VOLATILE); 59 return true;
64 if (!buffer.isDirect()) 60 }
65 throw new IllegalArgumentException(); 61
66 _buf = buffer; 62 @Override
67 setGetIndex(buffer.position()); 63 public byte[] array()
68 setPutIndex(buffer.limit()); 64 {
69 } 65 return null;
70 66 }
71 /** 67
72 * @param file 68 @Override
73 */ 69 public int capacity()
74 public DirectNIOBuffer(File file) throws IOException 70 {
75 { 71 return _buf.capacity();
76 super(READONLY,NON_VOLATILE); 72 }
77 FileInputStream fis = null; 73
78 FileChannel fc = null; 74 @Override
79 try 75 public byte peek(int position)
80 { 76 {
81 fis = new FileInputStream(file); 77 return _buf.get(position);
82 fc = fis.getChannel(); 78 }
83 _buf = fc.map(FileChannel.MapMode.READ_ONLY, 0, file.length()); 79
84 setGetIndex(0); 80 @Override
85 setPutIndex((int)file.length()); 81 public int peek(int index, byte[] b, int offset, int length)
86 _access=IMMUTABLE; 82 {
87 } 83 int l = length;
88 finally 84 if (index+l > capacity())
89 { 85 {
90 if (fc != null) try {fc.close();} catch (IOException e){LOG.trace("",e);} 86 l=capacity()-index;
91 IO.close(fis); 87 if (l==0)
92 } 88 return -1;
93 } 89 }
94 90
95 /* ------------------------------------------------------------ */ 91 if (l < 0)
96 public boolean isDirect() 92 return -1;
97 { 93 try
98 return true; 94 {
99 } 95 _buf.position(index);
100 96 _buf.get(b,offset,l);
101 /* ------------------------------------------------------------ */ 97 }
102 public byte[] array() 98 finally
103 { 99 {
104 return null; 100 _buf.position(0);
105 } 101 }
106 102
107 /* ------------------------------------------------------------ */ 103 return l;
108 public int capacity() 104 }
109 { 105
110 return _buf.capacity(); 106 @Override
111 } 107 public void poke(int index, byte b)
112 108 {
113 /* ------------------------------------------------------------ */ 109 if (isReadOnly()) throw new IllegalStateException(__READONLY);
114 public byte peek(int position) 110 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
115 { 111 if (index > capacity())
116 return _buf.get(position); 112 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
117 } 113 _buf.put(index,b);
118 114 }
119 public int peek(int index, byte[] b, int offset, int length) 115
120 { 116 @Override
121 int l = length; 117 public int poke(int index, Buffer src)
122 if (index+l > capacity()) 118 {
123 { 119 if (isReadOnly()) throw new IllegalStateException(__READONLY);
124 l=capacity()-index; 120
125 if (l==0) 121 byte[] array=src.array();
126 return -1; 122 if (array!=null)
127 } 123 {
128 124 return poke(index,array,src.getIndex(),src.length());
129 if (l < 0) 125 }
130 return -1; 126 else
131 try 127 {
132 { 128 Buffer src_buf=src.buffer();
133 _buf.position(index); 129 if (src_buf instanceof DirectNIOBuffer)
134 _buf.get(b,offset,l); 130 {
135 } 131 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf;
136 finally 132 if (src_bytebuf==_buf)
137 { 133 src_bytebuf=_buf.duplicate();
138 _buf.position(0); 134 try
139 } 135 {
140 136 _buf.position(index);
141 return l; 137 int space = _buf.remaining();
142 } 138
143 139 int length=src.length();
144 public void poke(int index, byte b) 140 if (length>space)
145 { 141 length=space;
146 if (isReadOnly()) throw new IllegalStateException(__READONLY); 142
147 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0"); 143 src_bytebuf.position(src.getIndex());
148 if (index > capacity()) 144 src_bytebuf.limit(src.getIndex()+length);
149 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity()); 145
150 _buf.put(index,b); 146 _buf.put(src_bytebuf);
151 } 147 return length;
152 148 }
153 @Override 149 finally
154 public int poke(int index, Buffer src) 150 {
155 { 151 _buf.position(0);
156 if (isReadOnly()) throw new IllegalStateException(__READONLY); 152 src_bytebuf.limit(src_bytebuf.capacity());
157 153 src_bytebuf.position(0);
158 byte[] array=src.array(); 154 }
159 if (array!=null) 155 }
160 { 156 else
161 return poke(index,array,src.getIndex(),src.length()); 157 return super.poke(index,src);
162 } 158 }
163 else 159 }
164 { 160
165 Buffer src_buf=src.buffer(); 161 @Override
166 if (src_buf instanceof DirectNIOBuffer) 162 public int poke(int index, byte[] b, int offset, int length)
167 { 163 {
168 ByteBuffer src_bytebuf = ((DirectNIOBuffer)src_buf)._buf; 164 if (isReadOnly()) throw new IllegalStateException(__READONLY);
169 if (src_bytebuf==_buf) 165
170 src_bytebuf=_buf.duplicate(); 166 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0");
171 try 167
172 { 168 if (index + length > capacity())
173 _buf.position(index); 169 {
174 int space = _buf.remaining(); 170 length=capacity()-index;
175 171 if (length<0)
176 int length=src.length(); 172 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity());
177 if (length>space) 173 }
178 length=space; 174
179 175 try
180 src_bytebuf.position(src.getIndex()); 176 {
181 src_bytebuf.limit(src.getIndex()+length); 177 _buf.position(index);
182 178
183 _buf.put(src_bytebuf); 179 int space=_buf.remaining();
184 return length; 180
185 } 181 if (length>space)
186 finally 182 length=space;
187 { 183 if (length>0)
188 _buf.position(0); 184 _buf.put(b,offset,length);
189 src_bytebuf.limit(src_bytebuf.capacity()); 185 return length;
190 src_bytebuf.position(0); 186 }
191 } 187 finally
192 } 188 {
193 else 189 _buf.position(0);
194 return super.poke(index,src); 190 }
195 } 191 }
196 } 192
197 193 @Override
198 @Override 194 public ByteBuffer getByteBuffer()
199 public int poke(int index, byte[] b, int offset, int length) 195 {
200 { 196 return _buf;
201 if (isReadOnly()) throw new IllegalStateException(__READONLY); 197 }
202 198
203 if (index < 0) throw new IllegalArgumentException("index<0: " + index + "<0"); 199 @Override
204 200 public int readFrom(InputStream in, int max) throws IOException
205 if (index + length > capacity()) 201 {
206 { 202 if (_in==null || !_in.isOpen() || in!=_inStream)
207 length=capacity()-index; 203 {
208 if (length<0) 204 _in = Channels.newChannel(in);
209 throw new IllegalArgumentException("index>capacity(): " + index + ">" + capacity()); 205 _inStream = in;
210 } 206 }
211 207
212 try 208 if (max<0 || max>space())
213 { 209 max=space();
214 _buf.position(index); 210 int p = putIndex();
215 211
216 int space=_buf.remaining(); 212 try
217 213 {
218 if (length>space) 214 int len=0, total=0, available=max;
219 length=space; 215 int loop=0;
220 if (length>0) 216 while (total<max)
221 _buf.put(b,offset,length); 217 {
222 return length; 218 _buf.position(p);
223 } 219 _buf.limit(p+available);
224 finally 220 len = _in.read(_buf);
225 { 221 if (len<0)
226 _buf.position(0); 222 {
227 } 223 _in = null;
228 } 224 _inStream = in;
229 225 break;
230 /* ------------------------------------------------------------ */ 226 }
231 public ByteBuffer getByteBuffer() 227 else if (len>0)
232 { 228 {
233 return _buf; 229 p += len;
234 } 230 total += len;
235 231 available -= len;
236 /* ------------------------------------------------------------ */ 232 setPutIndex(p);
237 @Override 233 loop=0;
238 public int readFrom(InputStream in, int max) throws IOException 234 }
239 { 235 else if (loop++>1)
240 if (_in==null || !_in.isOpen() || in!=_inStream) 236 break;
241 { 237 if (in.available()<=0)
242 _in=Channels.newChannel(in); 238 break;
243 _inStream=in; 239 }
244 } 240 if (len<0 && total==0)
245 241 return -1;
246 if (max<0 || max>space()) 242 return total;
247 max=space(); 243
248 int p = putIndex(); 244 }
249 245 catch(IOException e)
250 try 246 {
251 { 247 _in = null;
252 int len=0, total=0, available=max; 248 _inStream = in;
253 int loop=0; 249 throw e;
254 while (total<max) 250 }
255 { 251 finally
256 _buf.position(p); 252 {
257 _buf.limit(p+available); 253 if (_in!=null && !_in.isOpen())
258 len=_in.read(_buf); 254 {
259 if (len<0) 255 _in = null;
260 { 256 _inStream = in;
261 _in=null; 257 }
262 _inStream=in; 258 _buf.position(0);
263 break; 259 _buf.limit(_buf.capacity());
264 } 260 }
265 else if (len>0) 261 }
266 { 262
267 p += len;
268 total += len;
269 available -= len;
270 setPutIndex(p);
271 loop=0;
272 }
273 else if (loop++>1)
274 break;
275 if (in.available()<=0)
276 break;
277 }
278 if (len<0 && total==0)
279 return -1;
280 return total;
281
282 }
283 catch(IOException e)
284 {
285 _in=null;
286 _inStream=in;
287 throw e;
288 }
289 finally
290 {
291 if (_in!=null && !_in.isOpen())
292 {
293 _in=null;
294 _inStream=in;
295 }
296 _buf.position(0);
297 _buf.limit(_buf.capacity());
298 }
299 }
300
301 /* ------------------------------------------------------------ */
302 @Override
303 public void writeTo(OutputStream out) throws IOException
304 {
305 if (_out==null || !_out.isOpen() || out!=_outStream)
306 {
307 _out=Channels.newChannel(out);
308 _outStream=out;
309 }
310
311 synchronized (_buf)
312 {
313 try
314 {
315 int loop=0;
316 while(hasContent() && _out.isOpen())
317 {
318 _buf.position(getIndex());
319 _buf.limit(putIndex());
320 int len=_out.write(_buf);
321 if (len<0)
322 break;
323 else if (len>0)
324 {
325 skip(len);
326 loop=0;
327 }
328 else if (loop++>1)
329 break;
330 }
331
332 }
333 catch(IOException e)
334 {
335 _out=null;
336 _outStream=null;
337 throw e;
338 }
339 finally
340 {
341 if (_out!=null && !_out.isOpen())
342 {
343 _out=null;
344 _outStream=null;
345 }
346 _buf.position(0);
347 _buf.limit(_buf.capacity());
348 }
349 }
350 }
351
352
353
354 } 263 }