changeset 1480:1f41e5921090

input buffering
author Franklin Schmidt <fschmidt@gmail.com>
date Fri, 24 Apr 2020 14:32:20 -0600 (2020-04-24)
parents bd13aaeaf6d4
children 3cff066f3bbc
files src/goodjava/io/BufferedInputStream.java src/goodjava/io/LimitedInputStream.java src/goodjava/lucene/logging/LogFile.java
diffstat 3 files changed, 101 insertions(+), 312 deletions(-) [+]
line wrap: on
line diff
--- a/src/goodjava/io/BufferedInputStream.java	Fri Apr 24 10:52:54 2020 -0600
+++ b/src/goodjava/io/BufferedInputStream.java	Fri Apr 24 14:32:20 2020 -0600
@@ -1,26 +1,6 @@
 /*
  * Copyright (c) 1994, 2013, Oracle and/or its affiliates. All rights reserved.
  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
- *
  */
 
 package goodjava.io;
@@ -28,58 +8,24 @@
 import java.io.InputStream;
 import java.io.FilterInputStream;
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
  * A <code>BufferedInputStream</code> adds
  * functionality to another input stream-namely,
- * the ability to buffer the input and to
- * support the <code>mark</code> and <code>reset</code>
- * methods. When  the <code>BufferedInputStream</code>
+ * the ability to buffer the input. When  the <code>BufferedInputStream</code>
  * is created, an internal buffer array is
  * created. As bytes  from the stream are read
  * or skipped, the internal buffer is refilled
  * as necessary  from the contained input stream,
- * many bytes at a time. The <code>mark</code>
- * operation  remembers a point in the input
- * stream and the <code>reset</code> operation
- * causes all the  bytes read since the most
- * recent <code>mark</code> operation to be
- * reread before new bytes are  taken from
- * the contained input stream.
+ * many bytes at a time.
  *
  * @author  Arthur van Hoff
  * @since   JDK1.0
  */
-public class BufferedInputStream extends FilterInputStream {
-
+public final class BufferedInputStream extends FilterInputStream {
 	private static int DEFAULT_BUFFER_SIZE = 8192;
-
-	/**
-	 * The maximum size of array to allocate.
-	 * Some VMs reserve some header words in an array.
-	 * Attempts to allocate larger arrays may result in
-	 * OutOfMemoryError: Requested array size exceeds VM limit
-	 */
-	private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8;
-
-	/**
-	 * The internal buffer array where the data is stored. When necessary,
-	 * it may be replaced by another array of
-	 * a different size.
-	 */
-	protected volatile byte buf[];
-
-	/**
-	 * Atomic updater to provide compareAndSet for buf. This is
-	 * necessary because closes can be asynchronous. We use nullness
-	 * of buf[] as primary indicator that this stream is closed. (The
-	 * "in" field is also nulled out on close.)
-	 */
-	private static final
-		AtomicReferenceFieldUpdater<BufferedInputStream, byte[]> bufUpdater =
-		AtomicReferenceFieldUpdater.newUpdater
-		(BufferedInputStream.class,  byte[].class, "buf");
+	private final byte buf[];
+	private boolean isClosed = false;
 
 	/**
 	 * The index one greater than the index of the last valid byte in
@@ -88,9 +34,9 @@
 	 * in the range <code>0</code> through <code>buf.length</code>;
 	 * elements <code>buf[0]</code>  through <code>buf[count-1]
 	 * </code>contain buffered input data obtained
-	 * from the underlying  input stream.
+	 * from the underlying input stream.
 	 */
-	protected int count;
+	private int count;
 
 	/**
 	 * The current position in the buffer. This is the index of the next
@@ -107,71 +53,11 @@
 	 *
 	 * @see     java.io.BufferedInputStream#buf
 	 */
-	protected int pos;
-
-	/**
-	 * The value of the <code>pos</code> field at the time the last
-	 * <code>mark</code> method was called.
-	 * <p>
-	 * This value is always
-	 * in the range <code>-1</code> through <code>pos</code>.
-	 * If there is no marked position in  the input
-	 * stream, this field is <code>-1</code>. If
-	 * there is a marked position in the input
-	 * stream,  then <code>buf[markpos]</code>
-	 * is the first byte to be supplied as input
-	 * after a <code>reset</code> operation. If
-	 * <code>markpos</code> is not <code>-1</code>,
-	 * then all bytes from positions <code>buf[markpos]</code>
-	 * through  <code>buf[pos-1]</code> must remain
-	 * in the buffer array (though they may be
-	 * moved to  another place in the buffer array,
-	 * with suitable adjustments to the values
-	 * of <code>count</code>,  <code>pos</code>,
-	 * and <code>markpos</code>); they may not
-	 * be discarded unless and until the difference
-	 * between <code>pos</code> and <code>markpos</code>
-	 * exceeds <code>marklimit</code>.
-	 *
-	 * @see     java.io.BufferedInputStream#mark(int)
-	 * @see     java.io.BufferedInputStream#pos
-	 */
-	protected int markpos = -1;
+	private int pos;
 
-	/**
-	 * The maximum read ahead allowed after a call to the
-	 * <code>mark</code> method before subsequent calls to the
-	 * <code>reset</code> method fail.
-	 * Whenever the difference between <code>pos</code>
-	 * and <code>markpos</code> exceeds <code>marklimit</code>,
-	 * then the  mark may be dropped by setting
-	 * <code>markpos</code> to <code>-1</code>.
-	 *
-	 * @see     java.io.BufferedInputStream#mark(int)
-	 * @see     java.io.BufferedInputStream#reset()
-	 */
-	protected int marklimit;
-
-	/**
-	 * Check to make sure that underlying input stream has not been
-	 * nulled out due to close; if not return it;
-	 */
-	private InputStream getInIfOpen() throws IOException {
-		InputStream input = in;
-		if (input == null)
+	private void checkClosed() throws IOException {
+		if (isClosed)
 			throw new IOException("Stream closed");
-		return input;
-	}
-
-	/**
-	 * Check to make sure that buffer has not been nulled out due to
-	 * close; if not return it;
-	 */
-	private byte[] getBufIfOpen() throws IOException {
-		byte[] buffer = buf;
-		if (buffer == null)
-			throw new IOException("Stream closed");
-		return buffer;
 	}
 
 	/**
@@ -196,59 +82,24 @@
 	 *
 	 * @param   in     the underlying input stream.
 	 * @param   size   the buffer size.
-	 * @exception IllegalArgumentException if {@code size <= 0}.
 	 */
 	public BufferedInputStream(InputStream in, int size) {
 		super(in);
-		if (size <= 0) {
-			throw new IllegalArgumentException("Buffer size <= 0");
-		}
 		buf = new byte[size];
 	}
 
 	/**
-	 * Fills the buffer with more data, taking into account
-	 * shuffling and other tricks for dealing with marks.
+	 * Fills the buffer with more data.
 	 * Assumes that it is being called by a synchronized method.
 	 * This method also assumes that all data has already been read in,
 	 * hence pos > count.
 	 */
 	private void fill() throws IOException {
-		byte[] buffer = getBufIfOpen();
-		if (markpos < 0)
-			pos = 0;            /* no mark: throw away the buffer */
-		else if (pos >= buffer.length)  /* no room left in buffer */
-			if (markpos > 0) {  /* can throw away early part of the buffer */
-				int sz = pos - markpos;
-				System.arraycopy(buffer, markpos, buffer, 0, sz);
-				pos = sz;
-				markpos = 0;
-			} else if (buffer.length >= marklimit) {
-				markpos = -1;   /* buffer got too big, invalidate mark */
-				pos = 0;        /* drop buffer contents */
-			} else if (buffer.length >= MAX_BUFFER_SIZE) {
-				throw new OutOfMemoryError("Required array size too large");
-			} else {            /* grow buffer */
-				int nsz = (pos <= MAX_BUFFER_SIZE - pos) ?
-						pos * 2 : MAX_BUFFER_SIZE;
-				if (nsz > marklimit)
-					nsz = marklimit;
-				byte nbuf[] = new byte[nsz];
-				System.arraycopy(buffer, 0, nbuf, 0, pos);
-				if (!bufUpdater.compareAndSet(this, buffer, nbuf)) {
-					// Can't replace buf if there was an async close.
-					// Note: This would need to be changed if fill()
-					// is ever made accessible to multiple threads.
-					// But for now, the only way CAS can fail is via close.
-					// assert buf == null;
-					throw new IOException("Stream closed");
-				}
-				buffer = nbuf;
-			}
-		count = pos;
-		int n = getInIfOpen().read(buffer, pos, buffer.length - pos);
+		pos = 0;
+		count = 0;
+		int n = super.read(buf, 0, buf.length);
 		if (n > 0)
-			count = n + pos;
+			count = n;
 	}
 
 	/**
@@ -264,101 +115,40 @@
 	 * @see        java.io.FilterInputStream#in
 	 */
 	public synchronized int read() throws IOException {
+		checkClosed();
 		if (pos >= count) {
 			fill();
 			if (pos >= count)
 				return -1;
 		}
-		return getBufIfOpen()[pos++] & 0xff;
+		return buf[pos++] & 0xff;
 	}
 
 	/**
 	 * Read characters into a portion of an array, reading from the underlying
 	 * stream at most once if necessary.
 	 */
-	private int read1(byte[] b, int off, int len) throws IOException {
+	public synchronized int read(byte[] b, int off, int len) throws IOException {
+		checkClosed();
 		int avail = count - pos;
 		if (avail <= 0) {
-			/* If the requested length is at least as large as the buffer, and
-			   if there is no mark/reset activity, do not bother to copy the
+			/* If the requested length is at least as large as the buffer, do not bother to copy the
 			   bytes into the local buffer.  In this way buffered streams will
 			   cascade harmlessly. */
-			if (len >= getBufIfOpen().length && markpos < 0) {
-				return getInIfOpen().read(b, off, len);
+			if (len >= buf.length) {
+				return super.read(b, off, len);
 			}
 			fill();
 			avail = count - pos;
 			if (avail <= 0) return -1;
 		}
 		int cnt = (avail < len) ? avail : len;
-		System.arraycopy(getBufIfOpen(), pos, b, off, cnt);
+		System.arraycopy(buf, pos, b, off, cnt);
 		pos += cnt;
 		return cnt;
 	}
 
 	/**
-	 * Reads bytes from this byte-input stream into the specified byte array,
-	 * starting at the given offset.
-	 *
-	 * <p> This method implements the general contract of the corresponding
-	 * <code>{@link InputStream#read(byte[], int, int) read}</code> method of
-	 * the <code>{@link InputStream}</code> class.  As an additional
-	 * convenience, it attempts to read as many bytes as possible by repeatedly
-	 * invoking the <code>read</code> method of the underlying stream.  This
-	 * iterated <code>read</code> continues until one of the following
-	 * conditions becomes true: <ul>
-	 *
-	 *   <li> The specified number of bytes have been read,
-	 *
-	 *   <li> The <code>read</code> method of the underlying stream returns
-	 *   <code>-1</code>, indicating end-of-file, or
-	 *
-	 *   <li> The <code>available</code> method of the underlying stream
-	 *   returns zero, indicating that further input requests would block.
-	 *
-	 * </ul> If the first <code>read</code> on the underlying stream returns
-	 * <code>-1</code> to indicate end-of-file then this method returns
-	 * <code>-1</code>.  Otherwise this method returns the number of bytes
-	 * actually read.
-	 *
-	 * <p> Subclasses of this class are encouraged, but not required, to
-	 * attempt to read as many bytes as possible in the same fashion.
-	 *
-	 * @param      b     destination buffer.
-	 * @param      off   offset at which to start storing bytes.
-	 * @param      len   maximum number of bytes to read.
-	 * @return     the number of bytes read, or <code>-1</code> if the end of
-	 *             the stream has been reached.
-	 * @exception  IOException  if this input stream has been closed by
-	 *                          invoking its {@link #close()} method,
-	 *                          or an I/O error occurs.
-	 */
-	public synchronized int read(byte b[], int off, int len)
-		throws IOException
-	{
-		getBufIfOpen(); // Check for closed stream
-		if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
-			throw new IndexOutOfBoundsException();
-		} else if (len == 0) {
-			return 0;
-		}
-
-		int n = 0;
-		for (;;) {
-			int nread = read1(b, off + n, len - n);
-			if (nread <= 0)
-				return (n == 0) ? nread : n;
-			n += nread;
-			if (n >= len)
-				return n;
-			// if not closed but no bytes available, return
-			InputStream input = in;
-			if (input != null && input.available() <= 0)
-				return n;
-		}
-	}
-
-	/**
 	 * See the general contract of the <code>skip</code>
 	 * method of <code>InputStream</code>.
 	 *
@@ -368,24 +158,11 @@
 	 *                          I/O error occurs.
 	 */
 	public synchronized long skip(long n) throws IOException {
-		getBufIfOpen(); // Check for closed stream
-		if (n <= 0) {
-			return 0;
-		}
+		checkClosed();
 		long avail = count - pos;
-
 		if (avail <= 0) {
-			// If no mark position set then don't keep in buffer
-			if (markpos <0)
-				return getInIfOpen().skip(n);
-
-			// Fill in buffer to save bytes for reset
-			fill();
-			avail = count - pos;
-			if (avail <= 0)
-				return 0;
+			return super.skip(n);
 		}
-
 		long skipped = (avail < n) ? avail : n;
 		pos += skipped;
 		return skipped;
@@ -410,61 +187,20 @@
 	 */
 	public synchronized int available() throws IOException {
 		int n = count - pos;
-		int avail = getInIfOpen().available();
+		int avail = super.available();
 		return n > (Integer.MAX_VALUE - avail)
 					? Integer.MAX_VALUE
 					: n + avail;
 	}
 
-	/**
-	 * See the general contract of the <code>mark</code>
-	 * method of <code>InputStream</code>.
-	 *
-	 * @param   readlimit   the maximum limit of bytes that can be read before
-	 *                      the mark position becomes invalid.
-	 * @see     java.io.BufferedInputStream#reset()
-	 */
-	public synchronized void mark(int readlimit) {
-		marklimit = readlimit;
-		markpos = pos;
+	public void mark(int readlimit) {}
+
+	public void reset() throws IOException {
+		throw new IOException("mark/reset not supported");
 	}
 
-	/**
-	 * See the general contract of the <code>reset</code>
-	 * method of <code>InputStream</code>.
-	 * <p>
-	 * If <code>markpos</code> is <code>-1</code>
-	 * (no mark has been set or the mark has been
-	 * invalidated), an <code>IOException</code>
-	 * is thrown. Otherwise, <code>pos</code> is
-	 * set equal to <code>markpos</code>.
-	 *
-	 * @exception  IOException  if this stream has not been marked or,
-	 *                  if the mark has been invalidated, or the stream
-	 *                  has been closed by invoking its {@link #close()}
-	 *                  method, or an I/O error occurs.
-	 * @see        java.io.BufferedInputStream#mark(int)
-	 */
-	public synchronized void reset() throws IOException {
-		getBufIfOpen(); // Cause exception if closed
-		if (markpos < 0)
-			throw new IOException("Resetting to invalid mark");
-		pos = markpos;
-	}
-
-	/**
-	 * Tests if this input stream supports the <code>mark</code>
-	 * and <code>reset</code> methods. The <code>markSupported</code>
-	 * method of <code>BufferedInputStream</code> returns
-	 * <code>true</code>.
-	 *
-	 * @return  a <code>boolean</code> indicating if this stream type supports
-	 *          the <code>mark</code> and <code>reset</code> methods.
-	 * @see     java.io.InputStream#mark(int)
-	 * @see     java.io.InputStream#reset()
-	 */
 	public boolean markSupported() {
-		return true;
+		return false;
 	}
 
 	/**
@@ -476,17 +212,8 @@
 	 *
 	 * @exception  IOException  if an I/O error occurs.
 	 */
-	public void close() throws IOException {
-		byte[] buffer;
-		while ( (buffer = buf) != null) {
-			if (bufUpdater.compareAndSet(this, buffer, null)) {
-				InputStream input = in;
-				in = null;
-				if (input != null)
-					input.close();
-				return;
-			}
-			// Else retry in case a new buf was CASed in fill()
-		}
+	public synchronized void close() throws IOException {
+		isClosed = true;
+		super.close();
 	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/goodjava/io/LimitedInputStream.java	Fri Apr 24 14:32:20 2020 -0600
@@ -0,0 +1,58 @@
+package goodjava.io;
+import java.io.InputStream;
+import java.io.FilterInputStream;
+import java.io.IOException;
+
+
+public final class LimitedInputStream extends FilterInputStream {
+	private final long limit;
+	private long pos = 0;
+
+	public LimitedInputStream(InputStream in, long limit) {
+		super(in);
+		this.limit = limit;
+	}
+
+	public synchronized int read() throws IOException {
+		if( pos >= limit )
+			return -1;
+		int n = super.read();
+		if( n != -1 )
+			pos++;
+		return n;
+	}
+
+	public synchronized int read(byte[] b, int off, int len) throws IOException {
+		long avail = limit - pos;
+		if( avail <= 0 )
+			return -1;
+		if( len > avail )
+			len = (int)avail;
+		int n = super.read(b,off,len);
+		if( n > 0 )
+			pos += n;
+		return n;
+	}
+
+	public synchronized long skip(long n) throws IOException {
+		long avail = limit - pos;
+		if( avail <= 0 )
+			return 0;
+		if( n > avail )
+			n = (int)avail;
+		n = super.skip(n);
+		pos += n;
+		return n;
+	}
+
+	public synchronized int available() throws IOException {
+		long avail = limit - pos;
+		if( avail <= 0 )
+			return 0;
+		int n = super.available();
+		if( n > avail )
+			n = (int)avail;
+		return n;
+	}
+
+}
--- a/src/goodjava/lucene/logging/LogFile.java	Fri Apr 24 10:52:54 2020 -0600
+++ b/src/goodjava/lucene/logging/LogFile.java	Fri Apr 24 14:32:20 2020 -0600
@@ -6,7 +6,7 @@
 import java.io.DataOutputStream;
 import java.io.RandomAccessFile;
 import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +24,8 @@
 import org.apache.lucene.util.BytesRef;
 import goodjava.logging.Logger;
 import goodjava.logging.LoggerFactory;
+import goodjava.io.LimitedInputStream;
+import goodjava.io.BufferedInputStream;
 
 
 public class LogFile {
@@ -72,10 +74,12 @@
 	}
 
 	public LogInputStream input() throws IOException {
-		byte[] a = new byte[(int)end - 8];
-		raf.seek(8L);
-		raf.readFully(a);
-		return newLogInputStream(new ByteArrayInputStream(a));
+		InputStream in = new FileInputStream(file);
+		in = new LimitedInputStream(in,end);
+		in = new BufferedInputStream(in);
+		LogInputStream lis = newLogInputStream(in);
+		lis.readLong();  // skip end
+		return lis;
 	}
 
 	protected LogInputStream newLogInputStream(InputStream in) {