comparison src/goodjava/io/BufferedInputStream.java @ 1480:1f41e5921090

input buffering
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 24 Apr 2020 14:32:20 -0600
parents bd13aaeaf6d4
children 97740900c820
comparison
equal deleted inserted replaced
1479:bd13aaeaf6d4 1480:1f41e5921090
1 /* 1 /*
2 * Copyright (c) 1994, 2013, Oracle and/or its affiliates. All rights reserved. 2 * Copyright (c) 1994, 2013, Oracle and/or its affiliates. All rights reserved.
3 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. 3 * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
4 *
5 *
6 *
7 *
8 *
9 *
10 *
11 *
12 *
13 *
14 *
15 *
16 *
17 *
18 *
19 *
20 *
21 *
22 *
23 *
24 */ 4 */
25 5
26 package goodjava.io; 6 package goodjava.io;
27 7
28 import java.io.InputStream; 8 import java.io.InputStream;
29 import java.io.FilterInputStream; 9 import java.io.FilterInputStream;
30 import java.io.IOException; 10 import java.io.IOException;
31 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
32 11
33 /** 12 /**
34 * A <code>BufferedInputStream</code> adds 13 * A <code>BufferedInputStream</code> adds
35 * functionality to another input stream-namely, 14 * functionality to another input stream-namely,
36 * the ability to buffer the input and to 15 * the ability to buffer the input. When the <code>BufferedInputStream</code>
37 * support the <code>mark</code> and <code>reset</code>
38 * methods. When the <code>BufferedInputStream</code>
39 * is created, an internal buffer array is 16 * is created, an internal buffer array is
40 * created. As bytes from the stream are read 17 * created. As bytes from the stream are read
41 * or skipped, the internal buffer is refilled 18 * or skipped, the internal buffer is refilled
42 * as necessary from the contained input stream, 19 * as necessary from the contained input stream,
43 * many bytes at a time. The <code>mark</code> 20 * many bytes at a time.
44 * operation remembers a point in the input
45 * stream and the <code>reset</code> operation
46 * causes all the bytes read since the most
47 * recent <code>mark</code> operation to be
48 * reread before new bytes are taken from
49 * the contained input stream.
50 * 21 *
51 * @author Arthur van Hoff 22 * @author Arthur van Hoff
52 * @since JDK1.0 23 * @since JDK1.0
53 */ 24 */
54 public class BufferedInputStream extends FilterInputStream { 25 public final class BufferedInputStream extends FilterInputStream {
55
56 private static int DEFAULT_BUFFER_SIZE = 8192; 26 private static int DEFAULT_BUFFER_SIZE = 8192;
57 27 private final byte buf[];
58 /** 28 private boolean isClosed = false;
59 * The maximum size of array to allocate.
60 * Some VMs reserve some header words in an array.
61 * Attempts to allocate larger arrays may result in
62 * OutOfMemoryError: Requested array size exceeds VM limit
63 */
64 private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
65
66 /**
67 * The internal buffer array where the data is stored. When necessary,
68 * it may be replaced by another array of
69 * a different size.
70 */
71 protected volatile byte buf[];
72
73 /**
74 * Atomic updater to provide compareAndSet for buf. This is
75 * necessary because closes can be asynchronous. We use nullness
76 * of buf[] as primary indicator that this stream is closed. (The
77 * "in" field is also nulled out on close.)
78 */
79 private static final
80 AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
81 AtomicReferenceFieldUpdater.newUpdater
82 (BufferedInputStream.class, byte[].class, "buf");
83 29
84 /** 30 /**
85 * The index one greater than the index of the last valid byte in 31 * The index one greater than the index of the last valid byte in
86 * the buffer. 32 * the buffer.
87 * This value is always 33 * This value is always
88 * in the range <code>0</code> through <code>buf.length</code>; 34 * in the range <code>0</code> through <code>buf.length</code>;
89 * elements <code>buf[0]</code> through <code>buf[count-1] 35 * elements <code>buf[0]</code> through <code>buf[count-1]
90 * </code>contain buffered input data obtained 36 * </code>contain buffered input data obtained
91 * from the underlying input stream. 37 * from the underlying input stream.
92 */ 38 */
93 protected int count; 39 private int count;
94 40
95 /** 41 /**
96 * The current position in the buffer. This is the index of the next 42 * The current position in the buffer. This is the index of the next
97 * character to be read from the <code>buf</code> array. 43 * character to be read from the <code>buf</code> array.
98 * <p> 44 * <p>
105 * operation will require more bytes to be 51 * operation will require more bytes to be
106 * read from the contained input stream. 52 * read from the contained input stream.
107 * 53 *
108 * @see java.io.BufferedInputStream#buf 54 * @see java.io.BufferedInputStream#buf
109 */ 55 */
110 protected int pos; 56 private int pos;
111 57
112 /** 58 private void checkClosed() throws IOException {
113 * The value of the <code>pos</code> field at the time the last 59 if (isClosed)
114 * <code>mark</code> method was called.
115 * <p>
116 * This value is always
117 * in the range <code>-1</code> through <code>pos</code>.
118 * If there is no marked position in the input
119 * stream, this field is <code>-1</code>. If
120 * there is a marked position in the input
121 * stream, then <code>buf[markpos]</code>
122 * is the first byte to be supplied as input
123 * after a <code>reset</code> operation. If
124 * <code>markpos</code> is not <code>-1</code>,
125 * then all bytes from positions <code>buf[markpos]</code>
126 * through <code>buf[pos-1]</code> must remain
127 * in the buffer array (though they may be
128 * moved to another place in the buffer array,
129 * with suitable adjustments to the values
130 * of <code>count</code>, <code>pos</code>,
131 * and <code>markpos</code>); they may not
132 * be discarded unless and until the difference
133 * between <code>pos</code> and <code>markpos</code>
134 * exceeds <code>marklimit</code>.
135 *
136 * @see java.io.BufferedInputStream#mark(int)
137 * @see java.io.BufferedInputStream#pos
138 */
139 protected int markpos = -1;
140
141 /**
142 * The maximum read ahead allowed after a call to the
143 * <code>mark</code> method before subsequent calls to the
144 * <code>reset</code> method fail.
145 * Whenever the difference between <code>pos</code>
146 * and <code>markpos</code> exceeds <code>marklimit</code>,
147 * then the mark may be dropped by setting
148 * <code>markpos</code> to <code>-1</code>.
149 *
150 * @see java.io.BufferedInputStream#mark(int)
151 * @see java.io.BufferedInputStream#reset()
152 */
153 protected int marklimit;
154
155 /**
156 * Check to make sure that underlying input stream has not been
157 * nulled out due to close; if not return it;
158 */
159 private InputStream getInIfOpen() throws IOException {
160 InputStream input = in;
161 if (input == null)
162 throw new IOException("Stream closed"); 60 throw new IOException("Stream closed");
163 return input;
164 }
165
166 /**
167 * Check to make sure that buffer has not been nulled out due to
168 * close; if not return it;
169 */
170 private byte[] getBufIfOpen() throws IOException {
171 byte[] buffer = buf;
172 if (buffer == null)
173 throw new IOException("Stream closed");
174 return buffer;
175 } 61 }
176 62
177 /** 63 /**
178 * Creates a <code>BufferedInputStream</code> 64 * Creates a <code>BufferedInputStream</code>
179 * and saves its argument, the input stream 65 * and saves its argument, the input stream
194 * buffer array of length <code>size</code> 80 * buffer array of length <code>size</code>
195 * is created and stored in <code>buf</code>. 81 * is created and stored in <code>buf</code>.
196 * 82 *
197 * @param in the underlying input stream. 83 * @param in the underlying input stream.
198 * @param size the buffer size. 84 * @param size the buffer size.
199 * @exception IllegalArgumentException if {@code size <= 0}.
200 */ 85 */
201 public BufferedInputStream(InputStream in, int size) { 86 public BufferedInputStream(InputStream in, int size) {
202 super(in); 87 super(in);
203 if (size <= 0) {
204 throw new IllegalArgumentException("Buffer size <= 0");
205 }
206 buf = new byte[size]; 88 buf = new byte[size];
207 } 89 }
208 90
209 /** 91 /**
210 * Fills the buffer with more data, taking into account 92 * Fills the buffer with more data.
211 * shuffling and other tricks for dealing with marks.
212 * Assumes that it is being called by a synchronized method. 93 * Assumes that it is being called by a synchronized method.
213 * This method also assumes that all data has already been read in, 94 * This method also assumes that all data has already been read in,
214 * hence pos > count. 95 * hence pos > count.
215 */ 96 */
216 private void fill() throws IOException { 97 private void fill() throws IOException {
217 byte[] buffer = getBufIfOpen(); 98 pos = 0;
218 if (markpos < 0) 99 count = 0;
219 pos = 0; /* no mark: throw away the buffer */ 100 int n = super.read(buf, 0, buf.length);
220 else if (pos >= buffer.length) /* no room left in buffer */
221 if (markpos > 0) { /* can throw away early part of the buffer */
222 int sz = pos - markpos;
223 System.arraycopy(buffer, markpos, buffer, 0, sz);
224 pos = sz;
225 markpos = 0;
226 } else if (buffer.length >= marklimit) {
227 markpos = -1; /* buffer got too big, invalidate mark */
228 pos = 0; /* drop buffer contents */
229 } else if (buffer.length >= MAX_BUFFER_SIZE) {
230 throw new OutOfMemoryError("Required array size too large");
231 } else { /* grow buffer */
232 int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
233 pos * 2 : MAX_BUFFER_SIZE;
234 if (nsz > marklimit)
235 nsz = marklimit;
236 byte nbuf[] = new byte[nsz];
237 System.arraycopy(buffer, 0, nbuf, 0, pos);
238 if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
239 // Can't replace buf if there was an async close.
240 // Note: This would need to be changed if fill()
241 // is ever made accessible to multiple threads.
242 // But for now, the only way CAS can fail is via close.
243 // assert buf == null;
244 throw new IOException("Stream closed");
245 }
246 buffer = nbuf;
247 }
248 count = pos;
249 int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
250 if (n > 0) 101 if (n > 0)
251 count = n + pos; 102 count = n;
252 } 103 }
253 104
254 /** 105 /**
255 * See 106 * See
256 * the general contract of the <code>read</code> 107 * the general contract of the <code>read</code>
262 * invoking its {@link #close()} method, 113 * invoking its {@link #close()} method,
263 * or an I/O error occurs. 114 * or an I/O error occurs.
264 * @see java.io.FilterInputStream#in 115 * @see java.io.FilterInputStream#in
265 */ 116 */
266 public synchronized int read() throws IOException { 117 public synchronized int read() throws IOException {
118 checkClosed();
267 if (pos >= count) { 119 if (pos >= count) {
268 fill(); 120 fill();
269 if (pos >= count) 121 if (pos >= count)
270 return -1; 122 return -1;
271 } 123 }
272 return getBufIfOpen()[pos++] & 0xff; 124 return buf[pos++] & 0xff;
273 } 125 }
274 126
275 /** 127 /**
276 * Read characters into a portion of an array, reading from the underlying 128 * Read characters into a portion of an array, reading from the underlying
277 * stream at most once if necessary. 129 * stream at most once if necessary.
278 */ 130 */
279 private int read1(byte[] b, int off, int len) throws IOException { 131 public synchronized int read(byte[] b, int off, int len) throws IOException {
132 checkClosed();
280 int avail = count - pos; 133 int avail = count - pos;
281 if (avail <= 0) { 134 if (avail <= 0) {
282 /* If the requested length is at least as large as the buffer, and 135 /* If the requested length is at least as large as the buffer, do not bother to copy the
283 if there is no mark/reset activity, do not bother to copy the
284 bytes into the local buffer. In this way buffered streams will 136 bytes into the local buffer. In this way buffered streams will
285 cascade harmlessly. */ 137 cascade harmlessly. */
286 if (len >= getBufIfOpen().length && markpos < 0) { 138 if (len >= buf.length) {
287 return getInIfOpen().read(b, off, len); 139 return super.read(b, off, len);
288 } 140 }
289 fill(); 141 fill();
290 avail = count - pos; 142 avail = count - pos;
291 if (avail <= 0) return -1; 143 if (avail <= 0) return -1;
292 } 144 }
293 int cnt = (avail < len) ? avail : len; 145 int cnt = (avail < len) ? avail : len;
294 System.arraycopy(getBufIfOpen(), pos, b, off, cnt); 146 System.arraycopy(buf, pos, b, off, cnt);
295 pos += cnt; 147 pos += cnt;
296 return cnt; 148 return cnt;
297 }
298
299 /**
300 * Reads bytes from this byte-input stream into the specified byte array,
301 * starting at the given offset.
302 *
303 * <p> This method implements the general contract of the corresponding
304 * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
305 * the <code>{@link InputStream}</code> class. As an additional
306 * convenience, it attempts to read as many bytes as possible by repeatedly
307 * invoking the <code>read</code> method of the underlying stream. This
308 * iterated <code>read</code> continues until one of the following
309 * conditions becomes true: <ul>
310 *
311 * <li> The specified number of bytes have been read,
312 *
313 * <li> The <code>read</code> method of the underlying stream returns
314 * <code>-1</code>, indicating end-of-file, or
315 *
316 * <li> The <code>available</code> method of the underlying stream
317 * returns zero, indicating that further input requests would block.
318 *
319 * </ul> If the first <code>read</code> on the underlying stream returns
320 * <code>-1</code> to indicate end-of-file then this method returns
321 * <code>-1</code>. Otherwise this method returns the number of bytes
322 * actually read.
323 *
324 * <p> Subclasses of this class are encouraged, but not required, to
325 * attempt to read as many bytes as possible in the same fashion.
326 *
327 * @param b destination buffer.
328 * @param off offset at which to start storing bytes.
329 * @param len maximum number of bytes to read.
330 * @return the number of bytes read, or <code>-1</code> if the end of
331 * the stream has been reached.
332 * @exception IOException if this input stream has been closed by
333 * invoking its {@link #close()} method,
334 * or an I/O error occurs.
335 */
336 public synchronized int read(byte b[], int off, int len)
337 throws IOException
338 {
339 getBufIfOpen(); // Check for closed stream
340 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
341 throw new IndexOutOfBoundsException();
342 } else if (len == 0) {
343 return 0;
344 }
345
346 int n = 0;
347 for (;;) {
348 int nread = read1(b, off + n, len - n);
349 if (nread <= 0)
350 return (n == 0) ? nread : n;
351 n += nread;
352 if (n >= len)
353 return n;
354 // if not closed but no bytes available, return
355 InputStream input = in;
356 if (input != null && input.available() <= 0)
357 return n;
358 }
359 } 149 }
360 150
361 /** 151 /**
362 * See the general contract of the <code>skip</code> 152 * See the general contract of the <code>skip</code>
363 * method of <code>InputStream</code>. 153 * method of <code>InputStream</code>.
366 * or if this input stream has been closed by 156 * or if this input stream has been closed by
367 * invoking its {@link #close()} method, or an 157 * invoking its {@link #close()} method, or an
368 * I/O error occurs. 158 * I/O error occurs.
369 */ 159 */
370 public synchronized long skip(long n) throws IOException { 160 public synchronized long skip(long n) throws IOException {
371 getBufIfOpen(); // Check for closed stream 161 checkClosed();
372 if (n <= 0) { 162 long avail = count - pos;
373 return 0; 163 if (avail <= 0) {
164 return super.skip(n);
374 } 165 }
375 long avail = count - pos;
376
377 if (avail <= 0) {
378 // If no mark position set then don't keep in buffer
379 if (markpos <0)
380 return getInIfOpen().skip(n);
381
382 // Fill in buffer to save bytes for reset
383 fill();
384 avail = count - pos;
385 if (avail <= 0)
386 return 0;
387 }
388
389 long skipped = (avail < n) ? avail : n; 166 long skipped = (avail < n) ? avail : n;
390 pos += skipped; 167 pos += skipped;
391 return skipped; 168 return skipped;
392 } 169 }
393 170
408 * invoking its {@link #close()} method, 185 * invoking its {@link #close()} method,
409 * or an I/O error occurs. 186 * or an I/O error occurs.
410 */ 187 */
411 public synchronized int available() throws IOException { 188 public synchronized int available() throws IOException {
412 int n = count - pos; 189 int n = count - pos;
413 int avail = getInIfOpen().available(); 190 int avail = super.available();
414 return n > (Integer.MAX_VALUE - avail) 191 return n > (Integer.MAX_VALUE - avail)
415 ? Integer.MAX_VALUE 192 ? Integer.MAX_VALUE
416 : n + avail; 193 : n + avail;
417 } 194 }
418 195
419 /** 196 public void mark(int readlimit) {}
420 * See the general contract of the <code>mark</code> 197
421 * method of <code>InputStream</code>. 198 public void reset() throws IOException {
422 * 199 throw new IOException("mark/reset not supported");
423 * @param readlimit the maximum limit of bytes that can be read before 200 }
424 * the mark position becomes invalid. 201
425 * @see java.io.BufferedInputStream#reset()
426 */
427 public synchronized void mark(int readlimit) {
428 marklimit = readlimit;
429 markpos = pos;
430 }
431
432 /**
433 * See the general contract of the <code>reset</code>
434 * method of <code>InputStream</code>.
435 * <p>
436 * If <code>markpos</code> is <code>-1</code>
437 * (no mark has been set or the mark has been
438 * invalidated), an <code>IOException</code>
439 * is thrown. Otherwise, <code>pos</code> is
440 * set equal to <code>markpos</code>.
441 *
442 * @exception IOException if this stream has not been marked or,
443 * if the mark has been invalidated, or the stream
444 * has been closed by invoking its {@link #close()}
445 * method, or an I/O error occurs.
446 * @see java.io.BufferedInputStream#mark(int)
447 */
448 public synchronized void reset() throws IOException {
449 getBufIfOpen(); // Cause exception if closed
450 if (markpos < 0)
451 throw new IOException("Resetting to invalid mark");
452 pos = markpos;
453 }
454
455 /**
456 * Tests if this input stream supports the <code>mark</code>
457 * and <code>reset</code> methods. The <code>markSupported</code>
458 * method of <code>BufferedInputStream</code> returns
459 * <code>true</code>.
460 *
461 * @return a <code>boolean</code> indicating if this stream type supports
462 * the <code>mark</code> and <code>reset</code> methods.
463 * @see java.io.InputStream#mark(int)
464 * @see java.io.InputStream#reset()
465 */
466 public boolean markSupported() { 202 public boolean markSupported() {
467 return true; 203 return false;
468 } 204 }
469 205
470 /** 206 /**
471 * Closes this input stream and releases any system resources 207 * Closes this input stream and releases any system resources
472 * associated with the stream. 208 * associated with the stream.
474 * or skip() invocations will throw an IOException. 210 * or skip() invocations will throw an IOException.
475 * Closing a previously closed stream has no effect. 211 * Closing a previously closed stream has no effect.
476 * 212 *
477 * @exception IOException if an I/O error occurs. 213 * @exception IOException if an I/O error occurs.
478 */ 214 */
479 public void close() throws IOException { 215 public synchronized void close() throws IOException {
480 byte[] buffer; 216 isClosed = true;
481 while ( (buffer = buf) != null) { 217 super.close();
482 if (bufUpdater.compareAndSet(this, buffer, null)) {
483 InputStream input = in;
484 in = null;
485 if (input != null)
486 input.close();
487 return;
488 }
489 // Else retry in case a new buf was CASed in fill()
490 }
491 } 218 }
492 } 219 }